From fb5676fac38c6eb90e202e9ced330ffe7c03a223 Mon Sep 17 00:00:00 2001 From: "Joe Talerico (rook)" Date: Fri, 9 Sep 2022 13:44:40 -0400 Subject: [PATCH] Organize the project and fix a bug (#9) - Organize a bit. Still some minor things to take care of, but mainly there. - If the user deleted the previous pods and they were in "terminating" state, the function of GetPods would get the pods in "terminating" state.. Pod phase is still running, but we needed to look for the DeletionTimestamp Signed-off-by: Joe Talerico (rook) Signed-off-by: Joe Talerico (rook) --- archive.go | 2 + netperf.go | 376 +++---------------------- kubernetes.go => netperf/kubernetes.go | 86 +++--- netperf/netperf.go | 308 ++++++++++++++++++++ result.go => netperf/result.go | 33 ++- 5 files changed, 425 insertions(+), 380 deletions(-) rename kubernetes.go => netperf/kubernetes.go (62%) create mode 100644 netperf/netperf.go rename result.go => netperf/result.go (79%) diff --git a/archive.go b/archive.go index ab4efb38..2665fe15 100644 --- a/archive.go +++ b/archive.go @@ -15,6 +15,8 @@ type ElasticParams struct { index string } +// Connect accepts ElasticParams which describe how to connect to ES. +// Returns a client connected to the desired ES Cluster. func Connect(es ElasticParams) (*elasticsearch.Client, error) { log.Infof("Connecting to ES - %s", es.url) esc := elasticsearch.Config{ diff --git a/netperf.go b/netperf.go index aa4565fb..a3e4b4b2 100644 --- a/netperf.go +++ b/netperf.go @@ -1,102 +1,24 @@ package main import ( - "bytes" "context" "flag" - "fmt" - "io/ioutil" "os" "strings" + "gihub.com/jtaleric/k8s-netperf/netperf" log "github.com/sirupsen/logrus" - "gopkg.in/yaml.v3" - apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/remotecommand" ) -type NetPerfResult struct { - NetPerfConfig - Metric string - SameNode bool - Sample float64 - Summary []float64 -} - -type ScenarioResults struct { - results []NetPerfResult -} - -type NetPerfConfig struct { - Duration int `yaml:"duration,omitempty"` - Profile string `yaml:"profile,omitempty"` - Samples int `yaml:"samples,omitempty"` - MessageSize int `yaml:"messagesize,omitempty"` - Service bool `default:"false" yaml:"service,omitempty"` -} - -type PerfScenarios struct { - configs []NetPerfConfig - client *apiv1.PodList - server *apiv1.PodList - clientAcross *apiv1.PodList - service *apiv1.Service - restconfig rest.Config -} - -type DeploymentParams struct { - name string - namespace string - replicas int32 - image string - labels map[string]string - command []string - podaffinity apiv1.PodAffinity - podantiaffinity apiv1.PodAntiAffinity - nodeaffinity apiv1.NodeAffinity - port int -} - -type ServiceParams struct { - name string - namespace string - labels map[string]string - ctlPort int32 - dataPort int32 -} - -const serverCtlPort = 12865 -const serverDataPort = 42424 - -func parseConf(fn string) ([]NetPerfConfig, error) { - fmt.Printf("📒 Reading %s file.\r\n", fn) - buf, err := ioutil.ReadFile(fn) - if err != nil { - return nil, err - } - c := make(map[string]NetPerfConfig) - err = yaml.Unmarshal(buf, &c) - if err != nil { - return nil, fmt.Errorf("In file %q: %v", fn, err) - } - // Ignore the key - // Pull out the specific tests - var tests []NetPerfConfig - for _, value := range c { - tests = append(tests, value) - } - return tests, nil -} - func main() { cfgfile := flag.String("config", "netperf.yml", "K8s netperf Configuration File") + nl := flag.Bool("local", false, "Run Netperf with pod/server on the same node") + hn := flag.Bool("hostnet", false, "Run Netperf with pod/server on hostnet (must have two worker nodes)") flag.Parse() - cfg, err := parseConf(*cfgfile) + cfg, err := netperf.ParseConf(*cfgfile) if err != nil { log.Error(err) os.Exit(1) @@ -116,200 +38,33 @@ func main() { log.Fatal(err) os.Exit(1) } - s := PerfScenarios{ - restconfig: *rconfig, - configs: cfg, - } - - // Check if nodes have the zone label to keep the netperf test - // in the same AZ/Zone versus across AZ/Zone - z, err := GetZone(client) - if err != nil { - log.Warn(err) + s := netperf.PerfScenarios{ + NodeLocal: *nl, + HostNetwork: *hn, + RestConfig: *rconfig, + Configs: cfg, } - // Get node count nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) if err != nil { - log.Fatal(err) - os.Exit(1) - } - ncount := len(nodes.Items) - - // Create Netperf client on the same node as the server. - cdp := DeploymentParams{ - name: "client", - namespace: "netperf", - replicas: 1, - image: "quay.io/jtaleric/k8snetperf:latest", - labels: map[string]string{"role": "client"}, - command: []string{"/bin/bash", "-c", "sleep 10000000"}, - port: serverCtlPort, - } - if z != "" { - cdp.nodeaffinity = apiv1.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ - { - Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, - }, - }, - }, - }, - } - } - _, err = CreateDeployment(cdp, client) - if err != nil { - log.Error("Unable to create Client deployment") - os.Exit(1) - } - // Wait for pod(s) in the deployment to be ready - _, err = WaitForReady(client, cdp) - if err != nil { - log.Error(err) - os.Exit(1) - } - // Retrieve the pod information - s.client, err = GetPods(client, cdp) - if err != nil { - log.Error(err) - os.Exit(1) - } - - // Create netperf TCP service - spTcp := ServiceParams{ - name: "netperf-service", - namespace: "netperf", - labels: map[string]string{"role": "server"}, - ctlPort: serverCtlPort, - dataPort: serverDataPort, - } - s.service, err = CreateService(spTcp, client) - if err != nil { - fmt.Println("😥 Unable to create TCP netperf service") - log.Error(err) - os.Exit(1) - } - - // Start netperf server - sdp := DeploymentParams{ - name: "server", - namespace: "netperf", - replicas: 1, - image: "quay.io/jtaleric/k8snetperf:latest", - labels: map[string]string{"role": "server"}, - command: []string{"/bin/bash", "-c", "netserver; sleep 10000000"}, - podaffinity: apiv1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client"}}, - }, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - port: serverCtlPort, - } - if z != "" { - sdp.nodeaffinity = apiv1.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ - { - Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, - }, - }, - }, - }, - } - } - if ncount > 1 { - sdp.podantiaffinity = apiv1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client-across"}}, - }, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - } - - } - _, err = CreateDeployment(sdp, client) - if err != nil { - fmt.Println("😥 Unable to create Server deployment") log.Error(err) os.Exit(1) } - _, err = WaitForReady(client, sdp) + err = netperf.BuildSUT(client, &s) if err != nil { log.Error(err) os.Exit(1) } - // Retrieve pods which match the server/client role labels - s.server, err = GetPods(client, sdp) - if err != nil { - log.Error(err) + if !s.NodeLocal && len(nodes.Items) < 2 { + log.Error("Node count too low to run pod to pod across nodes.") os.Exit(1) } - cdp_across := DeploymentParams{ - name: "client-across", - namespace: "netperf", - replicas: 1, - image: "quay.io/jtaleric/k8snetperf:latest", - labels: map[string]string{"role": "client-across"}, - command: []string{"/bin/bash", "-c", "sleep 10000000"}, - port: serverCtlPort, - } - if z != "" { - cdp_across.nodeaffinity = apiv1.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ - { - Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, - }, - }, - }, - }, - } - } - - if ncount > 1 { - _, err = CreateDeployment(cdp_across, client) - if err != nil { - log.Error("Unable to create Client deployment") - os.Exit(1) - } - // Wait for pod(s) in the deployment to be ready - _, err = WaitForReady(client, cdp_across) - if err != nil { - log.Error(err) - os.Exit(1) - } - // Retrieve the pod information - s.clientAcross, err = GetPods(client, cdp_across) - if err != nil { - log.Error(err) - os.Exit(1) - } - } - var sr ScenarioResults + var sr netperf.ScenarioResults // Run through each test - for _, nc := range s.configs { + for _, nc := range s.Configs { // Determine the metric for the test metric := string("OP/s") if strings.Contains(nc.Profile, "STREAM") { @@ -317,95 +72,52 @@ func main() { } var serverIP string if nc.Service { - serverIP = s.service.Spec.ClusterIP + serverIP = s.Service.Spec.ClusterIP } else { - serverIP = s.server.Items[0].Status.PodIP + serverIP = s.Server.Items[0].Status.PodIP } - if s.clientAcross != nil { - npr := NetPerfResult{} - npr.NetPerfConfig = nc + if !s.NodeLocal { + npr := netperf.Data{} + npr.Config = nc npr.Metric = metric npr.SameNode = false for i := 0; i < nc.Samples; i++ { - r, err := RunNetPerf(client, s.restconfig, nc, s.clientAcross, serverIP) + r, err := netperf.Run(client, s.RestConfig, nc, s.ClientAcross, serverIP) if err != nil { log.Error(err) os.Exit(1) } - nr, err := ParseResults(&r, nc) + nr, err := netperf.ParseResults(&r) if err != nil { log.Error(err) os.Exit(1) } npr.Summary = append(npr.Summary, nr) } - sr.results = append(sr.results, npr) - } - // Reset the result as we are now testing a different scenario - // Consider breaking the result per-scenario-config - npr := NetPerfResult{} - npr.NetPerfConfig = nc - npr.Metric = metric - npr.SameNode = true - for i := 0; i < nc.Samples; i++ { - r, err := RunNetPerf(client, s.restconfig, nc, s.client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err := ParseResults(&r, nc) - if err != nil { - log.Error(err) - os.Exit(1) + sr.Results = append(sr.Results, npr) + } else { + // Reset the result as we are now testing a different scenario + // Consider breaking the result per-scenario-config + npr := netperf.Data{} + npr.Config = nc + npr.Metric = metric + npr.SameNode = true + for i := 0; i < nc.Samples; i++ { + r, err := netperf.Run(client, s.RestConfig, nc, s.Client, serverIP) + if err != nil { + log.Error(err) + os.Exit(1) + } + nr, err := netperf.ParseResults(&r) + if err != nil { + log.Error(err) + os.Exit(1) + } + npr.Summary = append(npr.Summary, nr) } - npr.Summary = append(npr.Summary, nr) + sr.Results = append(sr.Results, npr) } - sr.results = append(sr.results, npr) - } - ShowStreamResult(sr) - ShowRRResult(sr) -} - -// Display the netperf config -func ShowConfig(c NetPerfConfig) { - fmt.Printf("🗒️ Running netperf %s (service %t) for %ds\r\n", c.Profile, c.Service, c.Duration) -} - -// RunNetPerf will use the k8s client to run the netperf binary in the container image -// it will return a bytes.Buffer of the stdout. -func RunNetPerf(c *kubernetes.Clientset, rc rest.Config, nc NetPerfConfig, client *apiv1.PodList, serverIP string) (bytes.Buffer, error) { - var stdout, stderr bytes.Buffer - pod := client.Items[0] - fmt.Printf("🔥 Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, serverIP) - ShowConfig(nc) - cmd := []string{"/usr/local/bin/netperf", "-H", serverIP, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize), "-P", fmt.Sprintf("0,%d", serverDataPort)} - req := c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return stdout, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.Stream(remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return stdout, err } - // Sound check stderr - return stdout, nil + netperf.ShowStreamResult(sr) + netperf.ShowRRResult(sr) } diff --git a/kubernetes.go b/netperf/kubernetes.go similarity index 62% rename from kubernetes.go rename to netperf/kubernetes.go index b87955a9..c74ec557 100644 --- a/kubernetes.go +++ b/netperf/kubernetes.go @@ -1,4 +1,4 @@ -package main +package netperf import ( "context" @@ -15,7 +15,7 @@ import ( // It will return a bool based on if the pods ever become ready before we move on. func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) { fmt.Println("⏰ Checking for Pods to become ready...") - dw, err := c.AppsV1().Deployments(dp.namespace).Watch(context.TODO(), metav1.ListOptions{}) + dw, err := c.AppsV1().Deployments(dp.Namespace).Watch(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } @@ -25,7 +25,7 @@ func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) { if !ok { fmt.Println("❌ Issue with the Deployment") } - if d.Name == dp.name { + if d.Name == dp.Name { if d.Status.ReadyReplicas == 1 { return true, nil } @@ -64,40 +64,40 @@ func GetZone(c *kubernetes.Clientset) (string, error) { } func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv1.Deployment, error) { - d, err := client.AppsV1().Deployments(dp.namespace).Get(context.TODO(), dp.name, metav1.GetOptions{}) + d, err := client.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) if err == nil { if d.Status.ReadyReplicas > 0 { fmt.Println("♻️ Using existing Deployment") return d, nil } } - fmt.Printf("🚀 Starting Deployment for %s in %s\n", dp.name, dp.namespace) - dc := client.AppsV1().Deployments(dp.namespace) + fmt.Printf("🚀 Starting Deployment for %s in %s\n", dp.Name, dp.Namespace) + dc := client.AppsV1().Deployments(dp.Namespace) deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: dp.name, + Name: dp.Name, }, Spec: appsv1.DeploymentSpec{ - Replicas: &dp.replicas, + Replicas: &dp.Replicas, Selector: &metav1.LabelSelector{ - MatchLabels: dp.labels, + MatchLabels: dp.Labels, }, Template: apiv1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: dp.labels, + Labels: dp.Labels, }, Spec: apiv1.PodSpec{ Containers: []apiv1.Container{ { - Name: dp.name, - Image: dp.image, - Command: dp.command, + Name: dp.Name, + Image: dp.Image, + Command: dp.Command, }, }, Affinity: &apiv1.Affinity{ - NodeAffinity: &dp.nodeaffinity, - PodAffinity: &dp.podaffinity, - PodAntiAffinity: &dp.podantiaffinity, + NodeAffinity: &dp.NodeAffinity, + PodAffinity: &dp.PodAffinity, + PodAntiAffinity: &dp.PodAntiAffinity, }, }, }, @@ -109,58 +109,66 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv // GetPods searches for a specific set of pods from DeploymentParms // It returns a PodList if the deployment is found. // NOTE : Since we can update the replicas to be > 1, is why I return a PodList. -func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (*apiv1.PodList, error) { - d, err := c.AppsV1().Deployments(dp.namespace).Get(context.TODO(), dp.name, metav1.GetOptions{}) +func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (apiv1.PodList, error) { + d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) + npl := apiv1.PodList{} if err != nil { - return nil, fmt.Errorf("❌ Failure to capture deployment") + return npl, fmt.Errorf("❌ Failure to capture deployment") } selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) if err != nil { - return nil, fmt.Errorf("❌ Failure to capture deployment label") + return npl, fmt.Errorf("❌ Failure to capture deployment label") } - pods, err := c.CoreV1().Pods(dp.namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"}) + pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"}) if err != nil { - return nil, fmt.Errorf("❌ Failure to capture pods") + return npl, fmt.Errorf("❌ Failure to capture pods") } - return pods, nil + for pod := range pods.Items { + if pods.Items[pod].DeletionTimestamp != nil { + continue + } else { + npl.Items = append(npl.Items, pods.Items[pod]) + } + } + return npl, nil } func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Service, error) { - s, err := client.CoreV1().Services(sp.namespace).Get(context.TODO(), sp.name, metav1.GetOptions{}) + s, err := client.CoreV1().Services(sp.Namespace).Get(context.TODO(), sp.Name, metav1.GetOptions{}) if err == nil { - fmt.Println("♻️ Using existing Deployment") + fmt.Println("♻️ Using existing Service") return s, nil } - fmt.Printf("🚀 Creating service for %s in %s\n", sp.name, sp.namespace) - sc := client.CoreV1().Services(sp.namespace) + fmt.Printf("🚀 Creating service for %s in %s\n", sp.Name, sp.Namespace) + sc := client.CoreV1().Services(sp.Namespace) service := &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: sp.name, - Namespace: sp.namespace, + Name: sp.Name, + Namespace: sp.Namespace, }, Spec: apiv1.ServiceSpec{ Ports: []apiv1.ServicePort{ { - Name: fmt.Sprintf("%s-ctl", sp.name), + Name: fmt.Sprintf("%s-ctl", sp.Name), Protocol: apiv1.ProtocolTCP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.ctlPort)), - Port: sp.ctlPort, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.CtlPort)), + Port: sp.CtlPort, }, { - Name: fmt.Sprintf("%s-data-tcp", sp.name), + Name: fmt.Sprintf("%s-data-tcp", sp.Name), Protocol: apiv1.ProtocolTCP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)), - Port: sp.dataPort, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)), + Port: sp.DataPort, }, { - Name: fmt.Sprintf("%s-data-udp", sp.name), + Name: fmt.Sprintf("%s-data-udp", sp.Name), Protocol: apiv1.ProtocolUDP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)), - Port: sp.dataPort, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)), + Port: sp.DataPort, }, }, Type: apiv1.ServiceType("ClusterIP"), - Selector: sp.labels, + Selector: sp.Labels, }, } return sc.Create(context.TODO(), service, metav1.CreateOptions{}) diff --git a/netperf/netperf.go b/netperf/netperf.go new file mode 100644 index 00000000..a6d387b9 --- /dev/null +++ b/netperf/netperf.go @@ -0,0 +1,308 @@ +package netperf + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + + "gopkg.in/yaml.v2" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +type Config struct { + Duration int `yaml:"duration,omitempty"` + Profile string `yaml:"profile,omitempty"` + Samples int `yaml:"samples,omitempty"` + MessageSize int `yaml:"messagesize,omitempty"` + Service bool `default:"false" yaml:"service,omitempty"` +} + +type DeploymentParams struct { + Name string + Namespace string + Replicas int32 + Image string + Labels map[string]string + Command []string + PodAffinity apiv1.PodAffinity + PodAntiAffinity apiv1.PodAntiAffinity + NodeAffinity apiv1.NodeAffinity + Port int +} + +type PerfScenarios struct { + NodeLocal bool + HostNetwork bool + Configs []Config + Client apiv1.PodList + Server apiv1.PodList + ClientAcross apiv1.PodList + Service *apiv1.Service + RestConfig rest.Config +} + +type ServiceParams struct { + Name string + Namespace string + Labels map[string]string + CtlPort int32 + DataPort int32 +} + +const ServerCtlPort = 12865 +const ServerDataPort = 42424 + +func BuildSUT(client *kubernetes.Clientset, s *PerfScenarios) error { + // Check if nodes have the zone label to keep the netperf test + // in the same AZ/Zone versus across AZ/Zone + z, err := GetZone(client) + if err != nil { + fmt.Println(err) + } + // Get node count + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) + if err != nil { + return err + } + ncount := len(nodes.Items) + + if s.NodeLocal { + // Create Netperf client on the same node as the server. + cdp := DeploymentParams{ + Name: "client", + Namespace: "netperf", + Replicas: 1, + Image: "quay.io/jtaleric/k8snetperf:latest", + Labels: map[string]string{"role": "client"}, + Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Port: ServerCtlPort, + } + if z != "" { + cdp.NodeAffinity = apiv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: apiv1.NodeSelectorTerm{ + MatchExpressions: []apiv1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, + }, + }, + }, + }, + } + } + _, err = CreateDeployment(cdp, client) + if err != nil { + return fmt.Errorf("Unable to create Client deployment") + } + // Wait for pod(s) in the deployment to be ready + _, err = WaitForReady(client, cdp) + if err != nil { + return err + } + // Retrieve the pod information + s.Client, err = GetPods(client, cdp) + if err != nil { + return err + } + } + + // Create netperf TCP service + spTcp := ServiceParams{ + Name: "netperf-service", + Namespace: "netperf", + Labels: map[string]string{"role": "server"}, + CtlPort: ServerCtlPort, + DataPort: ServerDataPort, + } + s.Service, err = CreateService(spTcp, client) + if err != nil { + return fmt.Errorf("😥 Unable to create TCP netperf service") + } + cdp_across := DeploymentParams{ + Name: "client-across", + Namespace: "netperf", + Replicas: 1, + Image: "quay.io/jtaleric/k8snetperf:latest", + Labels: map[string]string{"role": "client-across"}, + Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Port: ServerCtlPort, + } + if z != "" { + cdp_across.NodeAffinity = apiv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: apiv1.NodeSelectorTerm{ + MatchExpressions: []apiv1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, + }, + }, + }, + }, + } + } + + if ncount > 1 { + _, err = CreateDeployment(cdp_across, client) + if err != nil { + return fmt.Errorf("Unable to create Client deployment") + } + // Wait for pod(s) in the deployment to be ready + _, err = WaitForReady(client, cdp_across) + if err != nil { + return err + } + // Retrieve the pod information + s.ClientAcross, err = GetPods(client, cdp_across) + if err != nil { + return err + } + } + + // Start netperf server + sdp := DeploymentParams{ + Name: "server", + Namespace: "netperf", + Replicas: 1, + Image: "quay.io/jtaleric/k8snetperf:latest", + Labels: map[string]string{"role": "server"}, + Command: []string{"/bin/bash", "-c", "netserver; sleep 10000000"}, + PodAffinity: apiv1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: apiv1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client"}}, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Port: ServerCtlPort, + } + if z != "" { + sdp.NodeAffinity = apiv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: apiv1.NodeSelectorTerm{ + MatchExpressions: []apiv1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, + }, + }, + }, + }, + } + } + if ncount > 1 { + sdp.PodAntiAffinity = apiv1.PodAntiAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: apiv1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client-across"}}, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + + } + _, err = CreateDeployment(sdp, client) + if err != nil { + return fmt.Errorf("😥 Unable to create Server deployment") + } + _, err = WaitForReady(client, sdp) + if err != nil { + return err + } + // Retrieve pods which match the server/client role labels + s.Server, err = GetPods(client, sdp) + if err != nil { + return err + } + + return nil +} + +// ParseConfig will read in the netperf configuration file which +// describes which tests to run +// Returns Config struct +func ParseConf(fn string) ([]Config, error) { + fmt.Printf("📒 Reading %s file.\r\n", fn) + buf, err := ioutil.ReadFile(fn) + if err != nil { + return nil, err + } + c := make(map[string]Config) + err = yaml.Unmarshal(buf, &c) + if err != nil { + return nil, fmt.Errorf("In file %q: %v", fn, err) + } + // Ignore the key + // Pull out the specific tests + var tests []Config + for _, value := range c { + tests = append(tests, value) + } + return tests, nil +} + +// Display the netperf config +func ShowConfig(c Config) { + fmt.Printf("🗒️ Running netperf %s (service %t) for %ds\r\n", c.Profile, c.Service, c.Duration) +} + +// RunNetPerf will use the k8s client to run the netperf binary in the container image +// it will return a bytes.Buffer of the stdout. +func Run(c *kubernetes.Clientset, rc rest.Config, nc Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { + var stdout, stderr bytes.Buffer + pod := client.Items[0] + fmt.Printf("🔥 Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, serverIP) + ShowConfig(nc) + cmd := []string{"/usr/local/bin/netperf", "-H", serverIP, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize), "-P", fmt.Sprintf("0,%d", ServerDataPort)} + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + // Sound check stderr + return stdout, nil +} diff --git a/result.go b/netperf/result.go similarity index 79% rename from result.go rename to netperf/result.go index 7042be12..a4b727e7 100644 --- a/result.go +++ b/netperf/result.go @@ -1,4 +1,4 @@ -package main +package netperf import ( "bytes" @@ -10,6 +10,18 @@ import ( stats "github.com/montanaflynn/stats" ) +type Data struct { + Config + Metric string + SameNode bool + Sample float64 + Summary []float64 +} + +type ScenarioResults struct { + Results []Data +} + var NetReg = regexp.MustCompile(`\s+\d+\s+\d+\s+(\d+|\S+)\s+(\S+|\d+)\s+(\S+)+\s+(\S+)?`) func average(vals []float64) (float64, error) { @@ -21,8 +33,8 @@ func percentile(vals []float64, ptile float64) (float64, error) { } func checkResults(s ScenarioResults, check string) bool { - for t := range s.results { - if strings.Contains(s.results[t].Profile, check) { + for t := range s.Results { + if strings.Contains(s.Results[t].Profile, check) { return true } } @@ -35,7 +47,7 @@ func ShowStreamResult(s ScenarioResults) { fmt.Printf("%s Stream Results %s\r\n", strings.Repeat("-", 59), strings.Repeat("-", 59)) fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg value") fmt.Printf("%s\r\n", strings.Repeat("-", 136)) - for _, r := range s.results { + for _, r := range s.Results { if strings.Contains(r.Profile, "STREAM") { avg, _ := average(r.Summary) fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric) @@ -45,15 +57,18 @@ func ShowStreamResult(s ScenarioResults) { } } +// ShowRRResults will display the RR performance results +// Currently showing the Avg Value. +// TODO: Capture latency values func ShowRRResult(s ScenarioResults) { if checkResults(s, "RR") { fmt.Printf("%s RR Results %s\r\n", strings.Repeat("-", 62), strings.Repeat("-", 62)) - fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "99%tile value") + fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg value") fmt.Printf("%s\r\n", strings.Repeat("-", 136)) - for _, r := range s.results { + for _, r := range s.Results { if strings.Contains(r.Profile, "RR") { - pct, _ := percentile(r.Summary, 99) - fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, pct, r.Metric) + avg, _ := average(r.Summary) + fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric) } } fmt.Printf("%s\r\n", strings.Repeat("-", 136)) @@ -63,7 +78,7 @@ func ShowRRResult(s ScenarioResults) { // ParseResults accepts the stdout from the execution of the benchmark. It also needs // The NetPerfConfig to determine aspects of the workload the user provided. // It will return a NetPerfResults struct or error -func ParseResults(stdout *bytes.Buffer, nc NetPerfConfig) (float64, error) { +func ParseResults(stdout *bytes.Buffer) (float64, error) { d := NetReg.FindStringSubmatch(stdout.String()) if len(d) < 5 { return 0, fmt.Errorf("❌ Unable to process results")