Skip to content

Commit

Permalink
Add service support. (#8)
Browse files Browse the repository at this point in the history
It can be enabled by setting 'service: True' in the profile definition
(see CRRService in netperf.yml).

For now the server control and data ports are fixed:
  const serverCtlPort = 12865
  const serverDataPort = 42424

Signed-off-by: Dumitru Ceara <[email protected]>

Signed-off-by: Dumitru Ceara <[email protected]>
  • Loading branch information
dceara authored Sep 8, 2022
1 parent 76f567a commit 4ecb8dd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 20 deletions.
42 changes: 42 additions & 0 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -123,3 +124,44 @@ func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (*apiv1.PodList, erro
}
return pods, nil
}

func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Service, error) {
s, err := client.CoreV1().Services(sp.namespace).Get(context.TODO(), sp.name, metav1.GetOptions{})
if err == nil {
fmt.Println("♻️ Using existing Deployment")
return s, nil
}
fmt.Printf("πŸš€ Creating service for %s in %s\n", sp.name, sp.namespace)
sc := client.CoreV1().Services(sp.namespace)
service := &apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: sp.name,
Namespace: sp.namespace,
},
Spec: apiv1.ServiceSpec{
Ports: []apiv1.ServicePort{
{
Name: fmt.Sprintf("%s-ctl", sp.name),
Protocol: apiv1.ProtocolTCP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.ctlPort)),
Port: sp.ctlPort,
},
{
Name: fmt.Sprintf("%s-data-tcp", sp.name),
Protocol: apiv1.ProtocolTCP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)),
Port: sp.dataPort,
},
{
Name: fmt.Sprintf("%s-data-udp", sp.name),
Protocol: apiv1.ProtocolUDP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.dataPort)),
Port: sp.dataPort,
},
},
Type: apiv1.ServiceType("ClusterIP"),
Selector: sp.labels,
},
}
return sc.Create(context.TODO(), service, metav1.CreateOptions{})
}
53 changes: 43 additions & 10 deletions netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ type NetPerfConfig struct {
Profile string `yaml:"profile,omitempty"`
Samples int `yaml:"samples,omitempty"`
MessageSize int `yaml:"messagesize,omitempty"`
Service bool `default:"false" yaml:"service,omitempty"`
}

type PerfScenarios struct {
configs []NetPerfConfig
client *apiv1.PodList
server *apiv1.PodList
clientAcross *apiv1.PodList
service *apiv1.Service
restconfig rest.Config
}

Expand All @@ -60,6 +62,17 @@ type DeploymentParams struct {
port int
}

type ServiceParams struct {
name string
namespace string
labels map[string]string
ctlPort int32
dataPort int32
}

const serverCtlPort = 12865
const serverDataPort = 42424

func parseConf(fn string) ([]NetPerfConfig, error) {
fmt.Printf("πŸ“’ Reading %s file.\r\n", fn)
buf, err := ioutil.ReadFile(fn)
Expand Down Expand Up @@ -131,7 +144,7 @@ func main() {
image: "quay.io/jtaleric/k8snetperf:latest",
labels: map[string]string{"role": "client"},
command: []string{"/bin/bash", "-c", "sleep 10000000"},
port: 12865,
port: serverCtlPort,
}
if z != "" {
cdp.nodeaffinity = apiv1.NodeAffinity{
Expand Down Expand Up @@ -165,6 +178,21 @@ func main() {
os.Exit(1)
}

// Create netperf TCP service
spTcp := ServiceParams{
name: "netperf-service",
namespace: "netperf",
labels: map[string]string{"role": "server"},
ctlPort: serverCtlPort,
dataPort: serverDataPort,
}
s.service, err = CreateService(spTcp, client)
if err != nil {
fmt.Println("πŸ˜₯ Unable to create TCP netperf service")
log.Error(err)
os.Exit(1)
}

// Start netperf server
sdp := DeploymentParams{
name: "server",
Expand All @@ -185,7 +213,7 @@ func main() {
},
},
},
port: 12865,
port: serverCtlPort,
}
if z != "" {
sdp.nodeaffinity = apiv1.NodeAffinity{
Expand Down Expand Up @@ -241,7 +269,7 @@ func main() {
image: "quay.io/jtaleric/k8snetperf:latest",
labels: map[string]string{"role": "client-across"},
command: []string{"/bin/bash", "-c", "sleep 10000000"},
port: 12865,
port: serverCtlPort,
}
if z != "" {
cdp_across.nodeaffinity = apiv1.NodeAffinity{
Expand Down Expand Up @@ -287,13 +315,19 @@ func main() {
if strings.Contains(nc.Profile, "STREAM") {
metric = "Mb/s"
}
var serverIP string
if nc.Service {
serverIP = s.service.Spec.ClusterIP
} else {
serverIP = s.server.Items[0].Status.PodIP
}
if s.clientAcross != nil {
npr := NetPerfResult{}
npr.NetPerfConfig = nc
npr.Metric = metric
npr.SameNode = false
for i := 0; i < nc.Samples; i++ {
r, err := RunNetPerf(client, s.restconfig, nc, s.clientAcross, s.server)
r, err := RunNetPerf(client, s.restconfig, nc, s.clientAcross, serverIP)
if err != nil {
log.Error(err)
os.Exit(1)
Expand All @@ -314,7 +348,7 @@ func main() {
npr.Metric = metric
npr.SameNode = true
for i := 0; i < nc.Samples; i++ {
r, err := RunNetPerf(client, s.restconfig, nc, s.client, s.server)
r, err := RunNetPerf(client, s.restconfig, nc, s.client, serverIP)
if err != nil {
log.Error(err)
os.Exit(1)
Expand All @@ -334,18 +368,17 @@ func main() {

// Display the netperf config
func ShowConfig(c NetPerfConfig) {
fmt.Printf("πŸ—’οΈ Running netperf %s for %ds\r\n", c.Profile, c.Duration)
fmt.Printf("πŸ—’οΈ Running netperf %s (service %t) for %ds\r\n", c.Profile, c.Service, c.Duration)
}

// RunNetPerf will use the k8s client to run the netperf binary in the container image
// it will return a bytes.Buffer of the stdout.
func RunNetPerf(c *kubernetes.Clientset, rc rest.Config, nc NetPerfConfig, client *apiv1.PodList, server *apiv1.PodList) (bytes.Buffer, error) {
func RunNetPerf(c *kubernetes.Clientset, rc rest.Config, nc NetPerfConfig, client *apiv1.PodList, serverIP string) (bytes.Buffer, error) {
var stdout, stderr bytes.Buffer
sip := server.Items[0].Status.PodIP
pod := client.Items[0]
fmt.Printf("πŸ”₯ Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, sip)
fmt.Printf("πŸ”₯ Client (%s,%s) starting netperf against server : %s\n", pod.Name, pod.Status.PodIP, serverIP)
ShowConfig(nc)
cmd := []string{"/usr/local/bin/netperf", "-H", sip, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize)}
cmd := []string{"/usr/local/bin/netperf", "-H", serverIP, "-l", fmt.Sprintf("%d", nc.Duration), "-t", nc.Profile, "--", "-R", "1", "-m", fmt.Sprintf("%d", nc.MessageSize), "-P", fmt.Sprintf("0,%d", serverDataPort)}
req := c.CoreV1().RESTClient().
Post().
Namespace(pod.Namespace).
Expand Down
7 changes: 7 additions & 0 deletions netperf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ CRR:
samples: 1
messagesize: 16384

CRRService:
profile: "TCP_CRR"
duration: 10
samples: 1
messagesize: 16384
service: True

RR:
profile: "TCP_RR"
duration: 10
Expand Down
20 changes: 10 additions & 10 deletions result.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@ func checkResults(s ScenarioResults, check string) bool {
// ShowStreamResults accepts NetPerfResults to display to the user via stdout
func ShowStreamResult(s ScenarioResults) {
if checkResults(s, "STREAM") {
fmt.Printf("%s Stream Results %s\r\n", strings.Repeat("-", (51)), strings.Repeat("-", (51)))
fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Message Size", "Same node", "Duration", "Samples", "Avg value")
fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75)))
fmt.Printf("%s Stream Results %s\r\n", strings.Repeat("-", 59), strings.Repeat("-", 59))
fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg value")
fmt.Printf("%s\r\n", strings.Repeat("-", 136))
for _, r := range s.results {
if strings.Contains(r.Profile, "STREAM") {
avg, _ := average(r.Summary)
fmt.Printf("πŸ“Š %-15s | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric)
fmt.Printf("πŸ“Š %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, avg, r.Metric)
}
}
fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75)))
fmt.Printf("%s\r\n", strings.Repeat("-", 136))
}
}

func ShowRRResult(s ScenarioResults) {
if checkResults(s, "RR") {
fmt.Printf("%s RR Results %s\r\n", strings.Repeat("-", (53)), strings.Repeat("-", (53)))
fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Message Size", "Same node", "Duration", "Samples", "99%tile value")
fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75)))
fmt.Printf("%s RR Results %s\r\n", strings.Repeat("-", 62), strings.Repeat("-", 62))
fmt.Printf("%-18s | %-15s | %-15s | %-15s | %-15s | %-15s | %-15s\r\n", "Scenario", "Service", "Message Size", "Same node", "Duration", "Samples", "99%tile value")
fmt.Printf("%s\r\n", strings.Repeat("-", 136))
for _, r := range s.results {
if strings.Contains(r.Profile, "RR") {
pct, _ := percentile(r.Summary, 99)
fmt.Printf("πŸ“Š %-15s | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.MessageSize, r.SameNode, r.Duration, r.Samples, pct, r.Metric)
fmt.Printf("πŸ“Š %-15s | %-15t | %-15d | %-15t | %-15d | %-15d | %-15f (%s) \r\n", r.Profile, r.Service, r.MessageSize, r.SameNode, r.Duration, r.Samples, pct, r.Metric)
}
}
fmt.Printf("%s\r\n", strings.Repeat("-", (18+25+75)))
fmt.Printf("%s\r\n", strings.Repeat("-", 136))
}
}

Expand Down

0 comments on commit 4ecb8dd

Please sign in to comment.