Skip to content

Commit

Permalink
Merge pull request #694 from cheney-lin/dev/report_numa_cpu
Browse files Browse the repository at this point in the history
feat:  report NUMA cpu usage
  • Loading branch information
luomingmeng authored Oct 19, 2024
2 parents e9504f1 + 22e8f39 commit 0a3785c
Show file tree
Hide file tree
Showing 19 changed files with 395 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package memory

import (
"context"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -311,7 +310,7 @@ func TestNumaMemoryPressurePlugin_GetTopEvictionPods(t *testing.T) {
now := time.Now()
for i, pod := range bePods {
for numaID, usage := range bePodUsageNuma[i] {
fakeMetricsFetcher.SetContainerNumaMetric(string(pod.UID), pod.Spec.Containers[0].Name, strconv.Itoa(numaID), consts.MetricsMemTotalPerNumaContainer, utilMetric.MetricData{Value: usage, Time: &now})
fakeMetricsFetcher.SetContainerNumaMetric(string(pod.UID), pod.Spec.Containers[0].Name, numaID, consts.MetricsMemTotalPerNumaContainer, utilMetric.MetricData{Value: usage, Time: &now})
}
}
for numaID, numaTotal := range numaTotalMap {
Expand Down
141 changes: 132 additions & 9 deletions pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
schedutil "github.com/kubewharf/katalyst-core/pkg/scheduler/util"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

Expand Down Expand Up @@ -309,6 +310,14 @@ func (p *nodeMetricsReporterPlugin) getNodeMetricInfo() (*nodeapis.NodeMetricInf
} else {
numaUsage.Usage.Memory = memoryUsage
}

numaCpuUsage, err := p.getNodeNUMACPUUsage(numaID)
if err != nil {
errList = append(errList, err)
} else {
numaUsage.Usage.CPU = numaCpuUsage
}

nmi.NUMAUsage = append(nmi.NUMAUsage, numaUsage)
}
return nmi, errors.NewAggregate(errList)
Expand Down Expand Up @@ -358,11 +367,12 @@ func (p *nodeMetricsReporterPlugin) getGroupMetricInfo() ([]nodeapis.GroupMetric
if !ok || len(pods) == 0 {
continue
}
groupUsage, effectivePods, err := p.getGroupUsage(pods, qosLevel)
groupUsage, groupNUMAUsages, effectivePods, err := p.getGroupUsage(pods, qosLevel)
if err != nil {
errList = append(errList, err)
} else {
metricInfo.GenericUsage = groupUsage
metricInfo.NUMAUsage = groupNUMAUsages
for _, pod := range effectivePods {
metricInfo.PodList = append(metricInfo.PodList, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}.String())
}
Expand All @@ -373,19 +383,30 @@ func (p *nodeMetricsReporterPlugin) getGroupMetricInfo() ([]nodeapis.GroupMetric
return groupMetrics, errors.NewAggregate(errList)
}

func (p *nodeMetricsReporterPlugin) getPodUsage(pod *v1.Pod) (v1.ResourceList, bool, error) {
func (p *nodeMetricsReporterPlugin) getPodUsage(pod *v1.Pod) (v1.ResourceList, map[int]v1.ResourceList, machine.CPUSet, bool, error) {
rampUp := false
assignedNUMAs := machine.NewCPUSet()
numaUsage := make(map[int]v1.ResourceList)

var errList []error
containers, ok := p.metaReader.GetContainerEntries(string(pod.UID))
if !ok {
return nil, false, fmt.Errorf("failed to get container info for pod %v", pod.Name)
return nil, nil, assignedNUMAs, false, fmt.Errorf("failed to get container info for pod %v", pod.Name)
}
podCPUUsage := .0
podMemUsage := .0
for _, container := range containers {
if container.RampUp {
rampUp = true
}

containerInfo, existed := p.metaReader.GetContainerInfo(string(pod.UID), container.ContainerName)
if !existed {
return nil, nil, assignedNUMAs, false, fmt.Errorf("failed to get container info for %v/%v", pod.Name, container.ContainerName)
}

assignedNUMAs.Add(machine.GetCPUAssignmentNUMAs(containerInfo.TopologyAwareAssignments).ToSliceInt()...)

metricContainerCPUUsage, err := p.metaServer.GetContainerMetric(string(pod.UID), container.ContainerName, consts.MetricCPUUsageContainer)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get MetricCPUUsageContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
Expand All @@ -402,23 +423,63 @@ func (p *nodeMetricsReporterPlugin) getPodUsage(pod *v1.Pod) (v1.ResourceList, b
errList = append(errList, fmt.Errorf("failed to get MetricMemCacheContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
}
podMemUsage += metricContainerMemUsage.Value - metricContainerMemCache.Value

metricContainerNUMACPUUsage, err := p.metaServer.GetContainerNumaMetrics(string(pod.UID), container.ContainerName, consts.MetricsCPUUsageNUMAContainer)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get MetricsCPUUsageNUMAContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
}
for numaID, metricCPUUsage := range metricContainerNUMACPUUsage {
usages, ok := numaUsage[numaID]
if !ok {
usages = make(v1.ResourceList)
}

cpuUsage, ok := usages[v1.ResourceCPU]
if !ok {
cpuUsage = *resource.NewMilliQuantity(0, resource.DecimalSI)
}
cpuUsage.Add(*resource.NewMilliQuantity(int64(metricCPUUsage.Value*1000), resource.DecimalSI))
usages[v1.ResourceCPU] = cpuUsage
numaUsage[numaID] = usages
}

metricContainerAnonMemoryUsage, err := p.metaServer.GetContainerNumaMetrics(string(pod.UID), container.ContainerName, consts.MetricsMemAnonPerNumaContainer)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get MetricsMemAnonPerNumaContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
}
for numaID, metricAnonMemoryUsage := range metricContainerAnonMemoryUsage {
usages, ok := numaUsage[numaID]
if !ok {
usages = make(v1.ResourceList)
}

memUsage, ok := usages[v1.ResourceMemory]
if !ok {
memUsage = *resource.NewQuantity(0, resource.BinarySI)
}
memUsage.Add(*resource.NewQuantity(int64(metricAnonMemoryUsage.Value), resource.BinarySI))
usages[v1.ResourceMemory] = memUsage
numaUsage[numaID] = usages
}
}

cpu := resource.NewMilliQuantity(int64(podCPUUsage*1000), resource.DecimalSI)
memory := resource.NewQuantity(int64(podMemUsage), resource.BinarySI)

return v1.ResourceList{v1.ResourceMemory: *memory, v1.ResourceCPU: *cpu}, rampUp, errors.NewAggregate(errList)
return v1.ResourceList{v1.ResourceMemory: *memory, v1.ResourceCPU: *cpu}, numaUsage, assignedNUMAs, rampUp, errors.NewAggregate(errList)
}

func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel string) (*nodeapis.ResourceMetric, []*v1.Pod, error) {
func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel string) (*nodeapis.ResourceMetric, []nodeapis.NUMAMetricInfo, []*v1.Pod, error) {
var errList []error

cpu := resource.NewQuantity(0, resource.DecimalSI)
memory := resource.NewQuantity(0, resource.BinarySI)

numaUsages := make(map[int]v1.ResourceList)

effectivePods := make([]*v1.Pod, 0)
for _, pod := range pods {
podUsage, rampUp, err := p.getPodUsage(pod)
podUsage, podNUMAUsage, assignedNUMAs, rampUp, err := p.getPodUsage(pod)
if err != nil {
general.ErrorS(err, "failed to getPodUsage", "pod", pod.Name)
continue
Expand All @@ -432,15 +493,39 @@ func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel strin
if podUsage.Memory().Cmp(*req.Memory()) < 0 {
podUsage[v1.ResourceMemory] = *req.Memory()
}

for numaID := range assignedNUMAs.ToSliceInt() {
podNUMAUsage[numaID] = map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *resource.NewMilliQuantity(req.Cpu().MilliValue()/int64(assignedNUMAs.Size()), resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(req.Memory().Value()/int64(assignedNUMAs.Size()), resource.BinarySI),
}
}
}
cpu.Add(*podUsage.Cpu())
memory.Add(*podUsage.Memory())

for numaID, podUsages := range podNUMAUsage {
usages, ok := numaUsages[numaID]
if !ok {
usages = make(v1.ResourceList)
}
for name, podResourceUsage := range podUsages {
usage, ok := usages[name]
if !ok {
usage = *resource.NewMilliQuantity(0, resource.DecimalSI)
}
usage.Add(podResourceUsage)
usages[name] = usage
}
numaUsages[numaID] = usages
}

klog.InfoS("pod usage", "pod", pod.Name, "cpu", podUsage.Cpu().AsApproximateFloat64(),
"memory", general.FormatMemoryQuantity(podUsage.Memory().AsApproximateFloat64()))
"memory", general.FormatMemoryQuantity(podUsage.Memory().AsApproximateFloat64()), "numaUsage", numaUsages)
}

resourceMetric := &nodeapis.ResourceMetric{}
resourceNUMAMetrics := make([]nodeapis.NUMAMetricInfo, 0)
aggMemory := p.getAggregatedMetric(memory, v1.ResourceMemory, "getGroupUsage", qosLevel, "memory")
if aggMemory == nil {
errList = append(errList, fmt.Errorf("failed to get enhough samples for group memory, qosLevel=%v", qosLevel))
Expand All @@ -455,15 +540,40 @@ func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel strin
resourceMetric.CPU = aggCPU
}

for numaID, resourceUsages := range numaUsages {
resourceNUMAMetric := nodeapis.ResourceMetric{}

cpuUsage := resourceUsages[v1.ResourceCPU]
aggNUMACPU := p.getAggregatedMetric(&cpuUsage, v1.ResourceCPU, "getGroupNUMAUsage", qosLevel, "cpu", strconv.Itoa(numaID))
if aggNUMACPU == nil {
errList = append(errList, fmt.Errorf("failed to get enhough samples for group numa cpu, qosLevel=%v, numa=%v", qosLevel, numaID))
} else {
resourceNUMAMetric.CPU = aggNUMACPU
}

memUsage := resourceUsages[v1.ResourceMemory]
aggNUMAMem := p.getAggregatedMetric(&memUsage, v1.ResourceMemory, "getGroupNUMAUsage", qosLevel, "memory", strconv.Itoa(numaID))
if aggNUMAMem == nil {
errList = append(errList, fmt.Errorf("failed to get enhough samples for group numa memory, qosLevel=%v, numa=%v", qosLevel, numaID))
} else {
resourceNUMAMetric.Memory = aggNUMAMem
}

resourceNUMAMetrics = append(resourceNUMAMetrics, nodeapis.NUMAMetricInfo{
NUMAId: numaID,
Usage: &resourceNUMAMetric,
})
}

err := errors.NewAggregate(errList)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

klog.InfoS("group usage", "qosLevel", qosLevel, "memory", general.FormatMemoryQuantity(memory.AsApproximateFloat64()),
"aggMemory", general.FormatMemoryQuantity(aggMemory.AsApproximateFloat64()), "cpu", cpu.AsApproximateFloat64(), "aggCPU", aggCPU.AsApproximateFloat64())

return resourceMetric, effectivePods, nil
return resourceMetric, resourceNUMAMetrics, effectivePods, nil
}

func (p *nodeMetricsReporterPlugin) getNodeMemoryUsage() (*resource.Quantity, error) {
Expand Down Expand Up @@ -510,6 +620,19 @@ func (p *nodeMetricsReporterPlugin) getNodeNUMAMemoryUsage(numaID int) (*resourc
return v, nil
}

func (p *nodeMetricsReporterPlugin) getNodeNUMACPUUsage(numaID int) (*resource.Quantity, error) {
metricCPUUsageNuma, err := p.metaServer.GetNumaMetric(numaID, consts.MetricCPUUsageNuma)
if err != nil {
return nil, fmt.Errorf("failed to get metricCPUUsageNuma of numa%v, err %v", numaID, err)
}
v := resource.NewMilliQuantity(int64(metricCPUUsageNuma.Value*1000), resource.DecimalSI)
v = p.getAggregatedMetric(v, v1.ResourceCPU, "getNodeNUMACPUUsage", strconv.Itoa(numaID))
if v == nil {
return nil, fmt.Errorf("failed to get enough samples for numa%v memory", numaID)
}
return v, nil
}

func (p *nodeMetricsReporterPlugin) getAggregatedMetric(value *resource.Quantity, resourceName v1.ResourceName, funcName string, args ...string) *resource.Quantity {
opts, ok := p.smoothWindowOpts[resourceName]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

Expand Down Expand Up @@ -138,11 +139,16 @@ func TestNodeMetricUpdate(t *testing.T) {
for numaId := 0; numaId < 2; numaId++ {
f.SetNumaMetric(numaId, consts.MetricMemUsedNuma, utilmetric.MetricData{Value: 50 << 30})
f.SetNumaMetric(numaId, consts.MetricMemFilepageNuma, utilmetric.MetricData{Value: 10 << 30})
f.SetNumaMetric(numaId, consts.MetricCPUUsageNuma, utilmetric.MetricData{Value: 2})
}

f.SetContainerMetric("uid1", "container1", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid1", "container1", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
f.SetContainerMetric("uid1", "container1", consts.MetricMemCacheContainer, utilmetric.MetricData{Value: 10 << 30})
for numaId := 0; numaId < 2; numaId++ {
f.SetContainerNumaMetric("uid1", "container1", numaId, consts.MetricsCPUUsageNUMAContainer, utilmetric.MetricData{Value: 1})
f.SetContainerNumaMetric("uid1", "container1", numaId, consts.MetricsMemAnonPerNumaContainer, utilmetric.MetricData{Value: 1 << 30})
}

f.SetContainerMetric("uid2", "container2", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid2", "container2", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
Expand All @@ -151,6 +157,10 @@ func TestNodeMetricUpdate(t *testing.T) {
f.SetContainerMetric("uid3", "container3", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid3", "container3", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
f.SetContainerMetric("uid3", "container3", consts.MetricMemCacheContainer, utilmetric.MetricData{Value: 10 << 30})

f.SetContainerMetric("uid4", "container4", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid4", "container4", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
f.SetContainerMetric("uid4", "container4", consts.MetricMemCacheContainer, utilmetric.MetricData{Value: 10 << 30})
},
pods: []*corev1.Pod{
{
Expand Down Expand Up @@ -228,6 +238,31 @@ func TestNodeMetricUpdate(t *testing.T) {
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod4",
Namespace: "default",
UID: "uid4",
Annotations: map[string]string{apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSharedCores},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container4",
Resources: corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("40Gi"),
},
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("20Gi"),
},
},
},
},
},
},
},
containers: []types2.ContainerInfo{
{
Expand All @@ -254,6 +289,16 @@ func TestNodeMetricUpdate(t *testing.T) {
ContainerType: pluginapi.ContainerType_MAIN,
ContainerIndex: 0,
},
{
PodUID: "uid4",
PodNamespace: "default",
PodName: "pod4",
ContainerName: "container4",
ContainerType: pluginapi.ContainerType_MAIN,
ContainerIndex: 0,
TopologyAwareAssignments: map[int]machine.CPUSet{0: machine.NewCPUSet(0), 1: machine.NewCPUSet(2)},
RampUp: true,
},
},
},
wantErr: false,
Expand All @@ -266,12 +311,14 @@ func TestNodeMetricUpdate(t *testing.T) {
NUMAId: 0,
Usage: &nodeapis.ResourceMetric{
Memory: mustParse("40Gi"),
CPU: mustParse("2"),
},
},
{
NUMAId: 1,
Usage: &nodeapis.ResourceMetric{
Memory: mustParse("40Gi"),
CPU: mustParse("2"),
},
},
},
Expand All @@ -296,13 +343,28 @@ func TestNodeMetricUpdate(t *testing.T) {
{
QoSLevel: apiconsts.PodAnnotationQoSLevelSharedCores,
ResourceUsage: nodeapis.ResourceUsage{
NUMAUsage: nil,
NUMAUsage: []nodeapis.NUMAMetricInfo{
{
NUMAId: 0,
Usage: &nodeapis.ResourceMetric{
CPU: mustParse("2"),
Memory: mustParse("11Gi"),
},
},
{
NUMAId: 1,
Usage: &nodeapis.ResourceMetric{
CPU: mustParse("2"),
Memory: mustParse("11Gi"),
},
},
},
GenericUsage: &nodeapis.ResourceMetric{
CPU: mustParse("1"),
Memory: mustParse("10Gi"),
CPU: mustParse("3"),
Memory: mustParse("30Gi"),
},
},
PodList: []string{"default/pod1"},
PodList: []string{"default/pod1", "default/pod4"},
},
{
QoSLevel: apiconsts.PodAnnotationQoSLevelReclaimedCores,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2441,7 +2441,7 @@ func TestUpdate(t *testing.T) {
metricsFetcher.SetContainerMetric(containerMetric.podUID, containerMetric.containerName, containerMetric.metricName, containerMetric.metricValue)
}
for _, containerNUMAMetric := range tt.containerNUMAMetrics {
metricsFetcher.SetContainerNumaMetric(containerNUMAMetric.podUID, containerNUMAMetric.containerName, strconv.Itoa(containerNUMAMetric.numaID), containerNUMAMetric.metricName, containerNUMAMetric.metricValue)
metricsFetcher.SetContainerNumaMetric(containerNUMAMetric.podUID, containerNUMAMetric.containerName, containerNUMAMetric.numaID, containerNUMAMetric.metricName, containerNUMAMetric.metricValue)
}
for _, qosClassMetric := range tt.cgroupMetrics {
metricsFetcher.SetCgroupMetric(qosClassMetric.cgroupPath, qosClassMetric.metricName, qosClassMetric.metricValue)
Expand Down
Loading

0 comments on commit 0a3785c

Please sign in to comment.