diff --git a/archive.go b/archive.go index ab4efb38..2665fe15 100644 --- a/archive.go +++ b/archive.go @@ -15,6 +15,8 @@ type ElasticParams struct { index string } +// Connect accepts ElasticParams which describe how to connect to ES. +// Returns a client connected to the desired ES Cluster. func Connect(es ElasticParams) (*elasticsearch.Client, error) { log.Infof("Connecting to ES - %s", es.url) esc := elasticsearch.Config{ diff --git a/netperf.go b/netperf.go index aa4565fb..a3e4b4b2 100644 --- a/netperf.go +++ b/netperf.go @@ -1,102 +1,24 @@ package main import ( - "bytes" "context" "flag" - "fmt" - "io/ioutil" "os" "strings" + "gihub.com/jtaleric/k8s-netperf/netperf" log "github.com/sirupsen/logrus" - "gopkg.in/yaml.v3" - apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/remotecommand" ) -type NetPerfResult struct { - NetPerfConfig - Metric string - SameNode bool - Sample float64 - Summary []float64 -} - -type ScenarioResults struct { - results []NetPerfResult -} - -type NetPerfConfig struct { - Duration int `yaml:"duration,omitempty"` - Profile string `yaml:"profile,omitempty"` - Samples int `yaml:"samples,omitempty"` - MessageSize int `yaml:"messagesize,omitempty"` - Service bool `default:"false" yaml:"service,omitempty"` -} - -type PerfScenarios struct { - configs []NetPerfConfig - client *apiv1.PodList - server *apiv1.PodList - clientAcross *apiv1.PodList - service *apiv1.Service - restconfig rest.Config -} - -type DeploymentParams struct { - name string - namespace string - replicas int32 - image string - labels map[string]string - command []string - podaffinity apiv1.PodAffinity - podantiaffinity apiv1.PodAntiAffinity - nodeaffinity apiv1.NodeAffinity - port int -} - -type ServiceParams struct { - name string - namespace string - labels map[string]string - ctlPort int32 - dataPort int32 -} - -const serverCtlPort = 12865 -const serverDataPort = 42424 - -func parseConf(fn string) ([]NetPerfConfig, error) { - fmt.Printf("📒 Reading %s file.\r\n", fn) - buf, err := ioutil.ReadFile(fn) - if err != nil { - return nil, err - } - c := make(map[string]NetPerfConfig) - err = yaml.Unmarshal(buf, &c) - if err != nil { - return nil, fmt.Errorf("In file %q: %v", fn, err) - } - // Ignore the key - // Pull out the specific tests - var tests []NetPerfConfig - for _, value := range c { - tests = append(tests, value) - } - return tests, nil -} - func main() { cfgfile := flag.String("config", "netperf.yml", "K8s netperf Configuration File") + nl := flag.Bool("local", false, "Run Netperf with pod/server on the same node") + hn := flag.Bool("hostnet", false, "Run Netperf with pod/server on hostnet (must have two worker nodes)") flag.Parse() - cfg, err := parseConf(*cfgfile) + cfg, err := netperf.ParseConf(*cfgfile) if err != nil { log.Error(err) os.Exit(1) @@ -116,200 +38,33 @@ func main() { log.Fatal(err) os.Exit(1) } - s := PerfScenarios{ - restconfig: *rconfig, - configs: cfg, - } - - // Check if nodes have the zone label to keep the netperf test - // in the same AZ/Zone versus across AZ/Zone - z, err := GetZone(client) - if err != nil { - log.Warn(err) + s := netperf.PerfScenarios{ + NodeLocal: *nl, + HostNetwork: *hn, + RestConfig: *rconfig, + Configs: cfg, } - // Get node count nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) if err != nil { - log.Fatal(err) - os.Exit(1) - } - ncount := len(nodes.Items) - - // Create Netperf client on the same node as the server. - cdp := DeploymentParams{ - name: "client", - namespace: "netperf", - replicas: 1, - image: "quay.io/jtaleric/k8snetperf:latest", - labels: map[string]string{"role": "client"}, - command: []string{"/bin/bash", "-c", "sleep 10000000"}, - port: serverCtlPort, - } - if z != "" { - cdp.nodeaffinity = apiv1.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ - { - Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, - }, - }, - }, - }, - } - } - _, err = CreateDeployment(cdp, client) - if err != nil { - log.Error("Unable to create Client deployment") - os.Exit(1) - } - // Wait for pod(s) in the deployment to be ready - _, err = WaitForReady(client, cdp) - if err != nil { - log.Error(err) - os.Exit(1) - } - // Retrieve the pod information - s.client, err = GetPods(client, cdp) - if err != nil { - log.Error(err) - os.Exit(1) - } - - // Create netperf TCP service - spTcp := ServiceParams{ - name: "netperf-service", - namespace: "netperf", - labels: map[string]string{"role": "server"}, - ctlPort: serverCtlPort, - dataPort: serverDataPort, - } - s.service, err = CreateService(spTcp, client) - if err != nil { - fmt.Println("😥 Unable to create TCP netperf service") - log.Error(err) - os.Exit(1) - } - - // Start netperf server - sdp := DeploymentParams{ - name: "server", - namespace: "netperf", - replicas: 1, - image: "quay.io/jtaleric/k8snetperf:latest", - labels: map[string]string{"role": "server"}, - command: []string{"/bin/bash", "-c", "netserver; sleep 10000000"}, - podaffinity: apiv1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client"}}, - }, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - port: serverCtlPort, - } - if z != "" { - sdp.nodeaffinity = apiv1.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ - { - Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, - }, - }, - }, - }, - } - } - if ncount > 1 { - sdp.podantiaffinity = apiv1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client-across"}}, - }, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - } - - } - _, err = CreateDeployment(sdp, client) - if err != nil { - fmt.Println("😥 Unable to create Server deployment") log.Error(err) os.Exit(1) } - _, err = WaitForReady(client, sdp) + err = netperf.BuildSUT(client, &s) if err != nil { log.Error(err) os.Exit(1) } - // Retrieve pods which match the server/client role labels - s.server, err = GetPods(client, sdp) - if err != nil { - log.Error(err) + if !s.NodeLocal && len(nodes.Items) < 2 { + log.Error("Node count too low to run pod to pod across nodes.") os.Exit(1) } - cdp_across := DeploymentParams{ - name: "client-across", - namespace: "netperf", - replicas: 1, - image: "quay.io/jtaleric/k8snetperf:latest", - labels: map[string]string{"role": "client-across"}, - command: []string{"/bin/bash", "-c", "sleep 10000000"}, - port: serverCtlPort, - } - if z != "" { - cdp_across.nodeaffinity = apiv1.NodeAffinity{ - PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ - { - Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, - }, - }, - }, - }, - } - } - - if ncount > 1 { - _, err = CreateDeployment(cdp_across, client) - if err != nil { - log.Error("Unable to create Client deployment") - os.Exit(1) - } - // Wait for pod(s) in the deployment to be ready - _, err = WaitForReady(client, cdp_across) - if err != nil { - log.Error(err) - os.Exit(1) - } - // Retrieve the pod information - s.clientAcross, err = GetPods(client, cdp_across) - if err != nil { - log.Error(err) - os.Exit(1) - } - } - var sr ScenarioResults + var sr netperf.ScenarioResults // Run through each test - for _, nc := range s.configs { + for _, nc := range s.Configs { // Determine the metric for the test metric := string("OP/s") if strings.Contains(nc.Profile, "STREAM") { @@ -317,95 +72,52 @@ func main() { } var serverIP string if nc.Service { - serverIP = s.service.Spec.ClusterIP + serverIP = s.Service.Spec.ClusterIP } else { - serverIP = s.server.Items[0].Status.PodIP + serverIP = s.Server.Items[0].Status.PodIP } - if s.clientAcross != nil { - npr := NetPerfResult{} - npr.NetPerfConfig = nc + if !s.NodeLocal { + npr := netperf.Data{} + npr.Config = nc npr.Metric = metric npr.SameNode = false for i := 0; i < nc.Samples; i++ { - r, err := RunNetPerf(client, s.restconfig, nc, s.clientAcross, serverIP) + r, err := netperf.Run(client, s.RestConfig, nc, s.ClientAcross, serverIP) if err != nil { log.Error(err) os.Exit(1) } - nr, err := ParseResults(&r, nc) + nr, err := netperf.ParseResults(&r) if err != nil { log.Error(err) os.Exit(1) } npr.Summary = append(npr.Summary, nr) } - sr.results = append(sr.results, npr) - } - // Reset the result as we are now testing a different scenario - // Consider breaking the result per-scenario-config - npr := NetPerfResult{} - npr.NetPerfConfig = nc - npr.Metric = metric - npr.SameNode = true - for i := 0; i < nc.Samples; i++ { - r, err := RunNetPerf(client, s.restconfig, nc, s.client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err := ParseResults(&r, nc) - if err != nil { - log.Error(err) - os.Exit(1) + sr.Results = append(sr.Results, npr) + } else { + // Reset the result as we are now testing a different scenario + // Consider breaking the result per-scenario-config + npr := netperf.Data{} + npr.Config = nc + npr.Metric = metric + npr.SameNode = true + for i := 0; i < nc.Samples; i++ { + r, err := netperf.Run(client, s.RestConfig, nc, s.Client, serverIP) + if err != nil { + log.Error(err) + os.Exit(1) + } + nr, err := netperf.ParseResults(&r) + if err != nil { + log.Error(err) + os.Exit(1) + } + npr.Summary = append(npr.Summary, nr) } - npr.Summary = append(npr.Summary, nr) + sr.Results = append(sr.Results, npr) } - sr.results = append(sr.results, npr) - } - ShowStreamResult(sr) - ShowRRResult(sr) -} - -// Display the netperf config -func ShowConfig(c NetPerfConfig) { - fmt.Printf("🗒️ Running netperf %s (service %t) for %ds\r\n", c.Profile, c.Service, c.Duration) -} - -// RunNetPerf will use the k8s client to run the netperf binary in the container image -// it will return a bytes.Buffer of the stdout. -func RunNetPerf(c *kubernetes.Clientset, rc rest.Config, nc NetPerfConfig, client *apiv1.PodList, serverIP string) (bytes.Buffer, error) { - var stdout, stderr bytes.Buffer - pod := client.Items[0] - fmt.Printf("🔥 Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, serverIP) - ShowConfig(nc) - cmd := []string{"/usr/local/bin/netperf", "-H", serverIP, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize), "-P", fmt.Sprintf("0,%d", serverDataPort)} - req := c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return stdout, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.Stream(remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return stdout, err } - // Sound check stderr - return stdout, nil + netperf.ShowStreamResult(sr) + netperf.ShowRRResult(sr) } diff --git a/kubernetes.go b/netperf/kubernetes.go similarity index 62% rename from kubernetes.go rename to netperf/kubernetes.go index b87955a9..c74ec557 100644 --- a/kubernetes.go +++ b/netperf/kubernetes.go @@ -1,4 +1,4 @@ -package main +package netperf import ( "context" @@ -15,7 +15,7 @@ import ( // It will return a bool based on if the pods ever become ready before we move on. func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) { fmt.Println("⏰ Checking for Pods to become ready...") - dw, err := c.AppsV1().Deployments(dp.namespace).Watch(context.TODO(), metav1.ListOptions{}) + dw, err := c.AppsV1().Deployments(dp.Namespace).Watch(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } @@ -25,7 +25,7 @@ func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) { if !ok { fmt.Println("❌ Issue with the Deployment") } - if d.Name == dp.name { + if d.Name == dp.Name { if d.Status.ReadyReplicas == 1 { return true, nil } @@ -64,40 +64,40 @@ func GetZone(c *kubernetes.Clientset) (string, error) { } func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv1.Deployment, error) { - d, err := client.AppsV1().Deployments(dp.namespace).Get(context.TODO(), dp.name, metav1.GetOptions{}) + d, err := client.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) if err == nil { if d.Status.ReadyReplicas > 0 { fmt.Println("♻️ Using existing Deployment") return d, nil } } - fmt.Printf("🚀 Starting Deployment for %s in %s\n", dp.name, dp.namespace) - dc := client.AppsV1().Deployments(dp.namespace) + fmt.Printf("🚀 Starting Deployment for %s in %s\n", dp.Name, dp.Namespace) + dc := client.AppsV1().Deployments(dp.Namespace) deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: dp.name, + Name: dp.Name, }, Spec: appsv1.DeploymentSpec{ - Replicas: &dp.replicas, + Replicas: &dp.Replicas, Selector: &metav1.LabelSelector{ - MatchLabels: dp.labels, + MatchLabels: dp.Labels, }, Template: apiv1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: dp.labels, + Labels: dp.Labels, }, Spec: apiv1.PodSpec{ Containers: []apiv1.Container{ { - Name: dp.name, - Image: dp.image, - Command: dp.command, + Name: dp.Name, + Image: dp.Image, + Command: dp.Command, }, }, Affinity: &apiv1.Affinity{ - NodeAffinity: &dp.nodeaffinity, - PodAffinity: &dp.podaffinity, - PodAntiAffinity: &dp.podantiaffinity, + NodeAffinity: &dp.NodeAffinity, + PodAffinity: &dp.PodAffinity, + PodAntiAffinity: &dp.PodAntiAffinity, }, }, }, @@ -109,58 +109,66 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv // GetPods searches for a specific set of pods from DeploymentParms // It returns a PodList if the deployment is found. // NOTE : Since we can update the replicas to be > 1, is why I return a PodList. -func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (*apiv1.PodList, error) { - d, err := c.AppsV1().Deployments(dp.namespace).Get(context.TODO(), dp.name, metav1.GetOptions{}) +func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (apiv1.PodList, error) { + d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) + npl := apiv1.PodList{} if err != nil { - return nil, fmt.Errorf("❌ Failure to capture deployment") + return npl, fmt.Errorf("❌ Failure to capture deployment") } selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) if err != nil { - return nil, fmt.Errorf("❌ Failure to capture deployment label") + return npl, fmt.Errorf("❌ Failure to capture deployment label") } - pods, err := c.CoreV1().Pods(dp.namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"}) + pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"}) if err != nil { - return nil, fmt.Errorf("❌ Failure to capture pods") + return npl, fmt.Errorf("❌ Failure to capture pods") } - return pods, nil + for pod := range pods.Items { + if pods.Items[pod].DeletionTimestamp != nil { + continue + } else { + npl.Items = append(npl.Items, pods.Items[pod]) + } + } + return npl, nil } func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Service, error) { - s, err := client.CoreV1().Services(sp.namespace).Get(context.TODO(), sp.name, metav1.GetOptions{}) + s, err := client.CoreV1().Services(sp.Namespace).Get(context.TODO(), sp.Name, metav1.GetOptions{}) if err == nil { - fmt.Println("♻️ Using existing Deployment") + fmt.Println("♻️ Using existing Service") return s, nil } - fmt.Printf("🚀 Creating service for %s in %s\n", sp.name, sp.namespace) - sc := client.CoreV1().Services(sp.namespace) + fmt.Printf("🚀 Creating service for %s in %s\n", sp.Name, sp.Namespace) + sc := client.CoreV1().Services(sp.Namespace) service := &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: sp.name, - Namespace: sp.namespace, + Name: sp.Name, + Namespace: sp.Namespace, }, Spec: apiv1.ServiceSpec{ Ports: []apiv1.ServicePort{ { - Name: fmt.Sprintf("%s-ctl", sp.name), + Name: fmt.Sprintf("%s-ctl", sp.Name), Protocol: apiv1.ProtocolTCP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.ctlPort)), - Port: sp.ctlPort, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.CtlPort)), + Port: sp.CtlPort, }, { - Name: fmt.Sprintf("%s-data-tcp", sp.name), + Name: fmt.Sprintf("%s-data-tcp", sp.Name), Protocol: apiv1.ProtocolTCP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)), - Port: sp.dataPort, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)), + Port: sp.DataPort, }, { - Name: fmt.Sprintf("%s-data-udp", sp.name), + Name: fmt.Sprintf("%s-data-udp", sp.Name), Protocol: apiv1.ProtocolUDP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)), - Port: sp.dataPort, + TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)), + Port: sp.DataPort, }, }, Type: apiv1.ServiceType("ClusterIP"), - Selector: sp.labels, + Selector: sp.Labels, }, } return sc.Create(context.TODO(), service, metav1.CreateOptions{}) diff --git a/netperf/netperf.go b/netperf/netperf.go new file mode 100644 index 00000000..a6d387b9 --- /dev/null +++ b/netperf/netperf.go @@ -0,0 +1,308 @@ +package netperf + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + + "gopkg.in/yaml.v2" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +type Config struct { + Duration int `yaml:"duration,omitempty"` + Profile string `yaml:"profile,omitempty"` + Samples int `yaml:"samples,omitempty"` + MessageSize int `yaml:"messagesize,omitempty"` + Service bool `default:"false" yaml:"service,omitempty"` +} + +type DeploymentParams struct { + Name string + Namespace string + Replicas int32 + Image string + Labels map[string]string + Command []string + PodAffinity apiv1.PodAffinity + PodAntiAffinity apiv1.PodAntiAffinity + NodeAffinity apiv1.NodeAffinity + Port int +} + +type PerfScenarios struct { + NodeLocal bool + HostNetwork bool + Configs []Config + Client apiv1.PodList + Server apiv1.PodList + ClientAcross apiv1.PodList + Service *apiv1.Service + RestConfig rest.Config +} + +type ServiceParams struct { + Name string + Namespace string + Labels map[string]string + CtlPort int32 + DataPort int32 +} + +const ServerCtlPort = 12865 +const ServerDataPort = 42424 + +func BuildSUT(client *kubernetes.Clientset, s *PerfScenarios) error { + // Check if nodes have the zone label to keep the netperf test + // in the same AZ/Zone versus across AZ/Zone + z, err := GetZone(client) + if err != nil { + fmt.Println(err) + } + // Get node count + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) + if err != nil { + return err + } + ncount := len(nodes.Items) + + if s.NodeLocal { + // Create Netperf client on the same node as the server. + cdp := DeploymentParams{ + Name: "client", + Namespace: "netperf", + Replicas: 1, + Image: "quay.io/jtaleric/k8snetperf:latest", + Labels: map[string]string{"role": "client"}, + Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Port: ServerCtlPort, + } + if z != "" { + cdp.NodeAffinity = apiv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: apiv1.NodeSelectorTerm{ + MatchExpressions: []apiv1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, + }, + }, + }, + }, + } + } + _, err = CreateDeployment(cdp, client) + if err != nil { + return fmt.Errorf("Unable to create Client deployment") + } + // Wait for pod(s) in the deployment to be ready + _, err = WaitForReady(client, cdp) + if err != nil { + return err + } + // Retrieve the pod information + s.Client, err = GetPods(client, cdp) + if err != nil { + return err + } + } + + // Create netperf TCP service + spTcp := ServiceParams{ + Name: "netperf-service", + Namespace: "netperf", + Labels: map[string]string{"role": "server"}, + CtlPort: ServerCtlPort, + DataPort: ServerDataPort, + } + s.Service, err = CreateService(spTcp, client) + if err != nil { + return fmt.Errorf("😥 Unable to create TCP netperf service") + } + cdp_across := DeploymentParams{ + Name: "client-across", + Namespace: "netperf", + Replicas: 1, + Image: "quay.io/jtaleric/k8snetperf:latest", + Labels: map[string]string{"role": "client-across"}, + Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Port: ServerCtlPort, + } + if z != "" { + cdp_across.NodeAffinity = apiv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: apiv1.NodeSelectorTerm{ + MatchExpressions: []apiv1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, + }, + }, + }, + }, + } + } + + if ncount > 1 { + _, err = CreateDeployment(cdp_across, client) + if err != nil { + return fmt.Errorf("Unable to create Client deployment") + } + // Wait for pod(s) in the deployment to be ready + _, err = WaitForReady(client, cdp_across) + if err != nil { + return err + } + // Retrieve the pod information + s.ClientAcross, err = GetPods(client, cdp_across) + if err != nil { + return err + } + } + + // Start netperf server + sdp := DeploymentParams{ + Name: "server", + Namespace: "netperf", + Replicas: 1, + Image: "quay.io/jtaleric/k8snetperf:latest", + Labels: map[string]string{"role": "server"}, + Command: []string{"/bin/bash", "-c", "netserver; sleep 10000000"}, + PodAffinity: apiv1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: apiv1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client"}}, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Port: ServerCtlPort, + } + if z != "" { + sdp.NodeAffinity = apiv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: apiv1.NodeSelectorTerm{ + MatchExpressions: []apiv1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{z}}, + }, + }, + }, + }, + } + } + if ncount > 1 { + sdp.PodAntiAffinity = apiv1.PodAntiAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []apiv1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: apiv1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: "role", Operator: metav1.LabelSelectorOpIn, Values: []string{"client-across"}}, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + + } + _, err = CreateDeployment(sdp, client) + if err != nil { + return fmt.Errorf("😥 Unable to create Server deployment") + } + _, err = WaitForReady(client, sdp) + if err != nil { + return err + } + // Retrieve pods which match the server/client role labels + s.Server, err = GetPods(client, sdp) + if err != nil { + return err + } + + return nil +} + +// ParseConfig will read in the netperf configuration file which +// describes which tests to run +// Returns Config struct +func ParseConf(fn string) ([]Config, error) { + fmt.Printf("📒 Reading %s file.\r\n", fn) + buf, err := ioutil.ReadFile(fn) + if err != nil { + return nil, err + } + c := make(map[string]Config) + err = yaml.Unmarshal(buf, &c) + if err != nil { + return nil, fmt.Errorf("In file %q: %v", fn, err) + } + // Ignore the key + // Pull out the specific tests + var tests []Config + for _, value := range c { + tests = append(tests, value) + } + return tests, nil +} + +// Display the netperf config +func ShowConfig(c Config) { + fmt.Printf("🗒️ Running netperf %s (service %t) for %ds\r\n", c.Profile, c.Service, c.Duration) +} + +// RunNetPerf will use the k8s client to run the netperf binary in the container image +// it will return a bytes.Buffer of the stdout. +func Run(c *kubernetes.Clientset, rc rest.Config, nc Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { + var stdout, stderr bytes.Buffer + pod := client.Items[0] + fmt.Printf("🔥 Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, serverIP) + ShowConfig(nc) + cmd := []string{"/usr/local/bin/netperf", "-H", serverIP, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize), "-P", fmt.Sprintf("0,%d", ServerDataPort)} + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + // Sound check stderr + return stdout, nil +} diff --git a/result.go b/netperf/result.go similarity index 79% rename from result.go rename to netperf/result.go index 7042be12..a4b727e7 100644 --- a/result.go +++ b/netperf/result.go @@ -1,4 +1,4 @@ -package main +package netperf import ( "bytes" @@ -10,6 +10,18 @@ import ( stats "github.com/montanaflynn/stats" ) +type Data struct { + Config + Metric string + SameNode bool + Sample float64 + Summary []float64 +} + +type ScenarioResults struct { + Results []Data +} + var NetReg = regexp.MustCompile(`\s+\d+\s+\d+\s+(\d+|\S+)\s+(\S+|\d+)\s+(\S+)+\s+(\S+)?`) func average(vals []float64) (float64, error) { @@ -21,8 +33,8 @@ func percentile(vals []float64, ptile float64) (float64, error) { } func checkResults(s ScenarioResults, check string) bool { - for t := range s.results { - if strings.Contains(s.results[t].Profile, check) { + for t := range s.Results { + if strings.Contains(s.Results[t].Profile, check) { return true } } @@ -35,7 +47,7 @@ func ShowStreamResult(s ScenarioResults) { fmt.Printf("%s Stream Results %s\r\n", strings.Repeat("-", 59), strings.Repeat("-", 59)) fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg value") fmt.Printf("%s\r\n", strings.Repeat("-", 136)) - for _, r := range s.results { + for _, r := range s.Results { if strings.Contains(r.Profile, "STREAM") { avg, _ := average(r.Summary) fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric) @@ -45,15 +57,18 @@ func ShowStreamResult(s ScenarioResults) { } } +// ShowRRResults will display the RR performance results +// Currently showing the Avg Value. +// TODO: Capture latency values func ShowRRResult(s ScenarioResults) { if checkResults(s, "RR") { fmt.Printf("%s RR Results %s\r\n", strings.Repeat("-", 62), strings.Repeat("-", 62)) - fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "99%tile value") + fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg value") fmt.Printf("%s\r\n", strings.Repeat("-", 136)) - for _, r := range s.results { + for _, r := range s.Results { if strings.Contains(r.Profile, "RR") { - pct, _ := percentile(r.Summary, 99) - fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, pct, r.Metric) + avg, _ := average(r.Summary) + fmt.Printf("📊 %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric) } } fmt.Printf("%s\r\n", strings.Repeat("-", 136)) @@ -63,7 +78,7 @@ func ShowRRResult(s ScenarioResults) { // ParseResults accepts the stdout from the execution of the benchmark. It also needs // The NetPerfConfig to determine aspects of the workload the user provided. // It will return a NetPerfResults struct or error -func ParseResults(stdout *bytes.Buffer, nc NetPerfConfig) (float64, error) { +func ParseResults(stdout *bytes.Buffer) (float64, error) { d := NetReg.FindStringSubmatch(stdout.String()) if len(d) < 5 { return 0, fmt.Errorf("❌ Unable to process results")