Skip to content

Commit

Permalink
feat: report NUMA cpu usage
Browse files Browse the repository at this point in the history
Signed-off-by: linzhecheng <[email protected]>
  • Loading branch information
cheney-lin committed Oct 18, 2024
1 parent e9504f1 commit 22e8f39
Show file tree
Hide file tree
Showing 19 changed files with 395 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package memory

import (
"context"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -311,7 +310,7 @@ func TestNumaMemoryPressurePlugin_GetTopEvictionPods(t *testing.T) {
now := time.Now()
for i, pod := range bePods {
for numaID, usage := range bePodUsageNuma[i] {
fakeMetricsFetcher.SetContainerNumaMetric(string(pod.UID), pod.Spec.Containers[0].Name, strconv.Itoa(numaID), consts.MetricsMemTotalPerNumaContainer, utilMetric.MetricData{Value: usage, Time: &now})
fakeMetricsFetcher.SetContainerNumaMetric(string(pod.UID), pod.Spec.Containers[0].Name, numaID, consts.MetricsMemTotalPerNumaContainer, utilMetric.MetricData{Value: usage, Time: &now})
}
}
for numaID, numaTotal := range numaTotalMap {
Expand Down
141 changes: 132 additions & 9 deletions pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
schedutil "github.com/kubewharf/katalyst-core/pkg/scheduler/util"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

Expand Down Expand Up @@ -309,6 +310,14 @@ func (p *nodeMetricsReporterPlugin) getNodeMetricInfo() (*nodeapis.NodeMetricInf
} else {
numaUsage.Usage.Memory = memoryUsage
}

numaCpuUsage, err := p.getNodeNUMACPUUsage(numaID)
if err != nil {
errList = append(errList, err)
} else {
numaUsage.Usage.CPU = numaCpuUsage
}

nmi.NUMAUsage = append(nmi.NUMAUsage, numaUsage)
}
return nmi, errors.NewAggregate(errList)
Expand Down Expand Up @@ -358,11 +367,12 @@ func (p *nodeMetricsReporterPlugin) getGroupMetricInfo() ([]nodeapis.GroupMetric
if !ok || len(pods) == 0 {
continue
}
groupUsage, effectivePods, err := p.getGroupUsage(pods, qosLevel)
groupUsage, groupNUMAUsages, effectivePods, err := p.getGroupUsage(pods, qosLevel)
if err != nil {
errList = append(errList, err)
} else {
metricInfo.GenericUsage = groupUsage
metricInfo.NUMAUsage = groupNUMAUsages
for _, pod := range effectivePods {
metricInfo.PodList = append(metricInfo.PodList, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}.String())
}
Expand All @@ -373,19 +383,30 @@ func (p *nodeMetricsReporterPlugin) getGroupMetricInfo() ([]nodeapis.GroupMetric
return groupMetrics, errors.NewAggregate(errList)
}

func (p *nodeMetricsReporterPlugin) getPodUsage(pod *v1.Pod) (v1.ResourceList, bool, error) {
func (p *nodeMetricsReporterPlugin) getPodUsage(pod *v1.Pod) (v1.ResourceList, map[int]v1.ResourceList, machine.CPUSet, bool, error) {
rampUp := false
assignedNUMAs := machine.NewCPUSet()
numaUsage := make(map[int]v1.ResourceList)

var errList []error
containers, ok := p.metaReader.GetContainerEntries(string(pod.UID))
if !ok {
return nil, false, fmt.Errorf("failed to get container info for pod %v", pod.Name)
return nil, nil, assignedNUMAs, false, fmt.Errorf("failed to get container info for pod %v", pod.Name)
}
podCPUUsage := .0
podMemUsage := .0
for _, container := range containers {
if container.RampUp {
rampUp = true
}

containerInfo, existed := p.metaReader.GetContainerInfo(string(pod.UID), container.ContainerName)
if !existed {
return nil, nil, assignedNUMAs, false, fmt.Errorf("failed to get container info for %v/%v", pod.Name, container.ContainerName)
}

assignedNUMAs.Add(machine.GetCPUAssignmentNUMAs(containerInfo.TopologyAwareAssignments).ToSliceInt()...)

metricContainerCPUUsage, err := p.metaServer.GetContainerMetric(string(pod.UID), container.ContainerName, consts.MetricCPUUsageContainer)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get MetricCPUUsageContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
Expand All @@ -402,23 +423,63 @@ func (p *nodeMetricsReporterPlugin) getPodUsage(pod *v1.Pod) (v1.ResourceList, b
errList = append(errList, fmt.Errorf("failed to get MetricMemCacheContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
}
podMemUsage += metricContainerMemUsage.Value - metricContainerMemCache.Value

metricContainerNUMACPUUsage, err := p.metaServer.GetContainerNumaMetrics(string(pod.UID), container.ContainerName, consts.MetricsCPUUsageNUMAContainer)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get MetricsCPUUsageNUMAContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
}
for numaID, metricCPUUsage := range metricContainerNUMACPUUsage {
usages, ok := numaUsage[numaID]
if !ok {
usages = make(v1.ResourceList)
}

cpuUsage, ok := usages[v1.ResourceCPU]
if !ok {
cpuUsage = *resource.NewMilliQuantity(0, resource.DecimalSI)
}
cpuUsage.Add(*resource.NewMilliQuantity(int64(metricCPUUsage.Value*1000), resource.DecimalSI))
usages[v1.ResourceCPU] = cpuUsage
numaUsage[numaID] = usages
}

metricContainerAnonMemoryUsage, err := p.metaServer.GetContainerNumaMetrics(string(pod.UID), container.ContainerName, consts.MetricsMemAnonPerNumaContainer)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get MetricsMemAnonPerNumaContainer, podUID=%v, containerName=%v, err=%v", pod.UID, container.ContainerName, err))
}
for numaID, metricAnonMemoryUsage := range metricContainerAnonMemoryUsage {
usages, ok := numaUsage[numaID]
if !ok {
usages = make(v1.ResourceList)
}

memUsage, ok := usages[v1.ResourceMemory]
if !ok {
memUsage = *resource.NewQuantity(0, resource.BinarySI)
}
memUsage.Add(*resource.NewQuantity(int64(metricAnonMemoryUsage.Value), resource.BinarySI))
usages[v1.ResourceMemory] = memUsage
numaUsage[numaID] = usages
}
}

cpu := resource.NewMilliQuantity(int64(podCPUUsage*1000), resource.DecimalSI)
memory := resource.NewQuantity(int64(podMemUsage), resource.BinarySI)

return v1.ResourceList{v1.ResourceMemory: *memory, v1.ResourceCPU: *cpu}, rampUp, errors.NewAggregate(errList)
return v1.ResourceList{v1.ResourceMemory: *memory, v1.ResourceCPU: *cpu}, numaUsage, assignedNUMAs, rampUp, errors.NewAggregate(errList)
}

func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel string) (*nodeapis.ResourceMetric, []*v1.Pod, error) {
func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel string) (*nodeapis.ResourceMetric, []nodeapis.NUMAMetricInfo, []*v1.Pod, error) {
var errList []error

cpu := resource.NewQuantity(0, resource.DecimalSI)
memory := resource.NewQuantity(0, resource.BinarySI)

numaUsages := make(map[int]v1.ResourceList)

effectivePods := make([]*v1.Pod, 0)
for _, pod := range pods {
podUsage, rampUp, err := p.getPodUsage(pod)
podUsage, podNUMAUsage, assignedNUMAs, rampUp, err := p.getPodUsage(pod)
if err != nil {
general.ErrorS(err, "failed to getPodUsage", "pod", pod.Name)
continue
Expand All @@ -432,15 +493,39 @@ func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel strin
if podUsage.Memory().Cmp(*req.Memory()) < 0 {
podUsage[v1.ResourceMemory] = *req.Memory()
}

for numaID := range assignedNUMAs.ToSliceInt() {
podNUMAUsage[numaID] = map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *resource.NewMilliQuantity(req.Cpu().MilliValue()/int64(assignedNUMAs.Size()), resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(req.Memory().Value()/int64(assignedNUMAs.Size()), resource.BinarySI),
}
}
}
cpu.Add(*podUsage.Cpu())
memory.Add(*podUsage.Memory())

for numaID, podUsages := range podNUMAUsage {
usages, ok := numaUsages[numaID]
if !ok {
usages = make(v1.ResourceList)
}
for name, podResourceUsage := range podUsages {
usage, ok := usages[name]
if !ok {
usage = *resource.NewMilliQuantity(0, resource.DecimalSI)
}
usage.Add(podResourceUsage)
usages[name] = usage
}
numaUsages[numaID] = usages
}

klog.InfoS("pod usage", "pod", pod.Name, "cpu", podUsage.Cpu().AsApproximateFloat64(),
"memory", general.FormatMemoryQuantity(podUsage.Memory().AsApproximateFloat64()))
"memory", general.FormatMemoryQuantity(podUsage.Memory().AsApproximateFloat64()), "numaUsage", numaUsages)
}

resourceMetric := &nodeapis.ResourceMetric{}
resourceNUMAMetrics := make([]nodeapis.NUMAMetricInfo, 0)
aggMemory := p.getAggregatedMetric(memory, v1.ResourceMemory, "getGroupUsage", qosLevel, "memory")
if aggMemory == nil {
errList = append(errList, fmt.Errorf("failed to get enhough samples for group memory, qosLevel=%v", qosLevel))
Expand All @@ -455,15 +540,40 @@ func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel strin
resourceMetric.CPU = aggCPU
}

for numaID, resourceUsages := range numaUsages {
resourceNUMAMetric := nodeapis.ResourceMetric{}

cpuUsage := resourceUsages[v1.ResourceCPU]
aggNUMACPU := p.getAggregatedMetric(&cpuUsage, v1.ResourceCPU, "getGroupNUMAUsage", qosLevel, "cpu", strconv.Itoa(numaID))
if aggNUMACPU == nil {
errList = append(errList, fmt.Errorf("failed to get enhough samples for group numa cpu, qosLevel=%v, numa=%v", qosLevel, numaID))
} else {
resourceNUMAMetric.CPU = aggNUMACPU
}

memUsage := resourceUsages[v1.ResourceMemory]
aggNUMAMem := p.getAggregatedMetric(&memUsage, v1.ResourceMemory, "getGroupNUMAUsage", qosLevel, "memory", strconv.Itoa(numaID))
if aggNUMAMem == nil {
errList = append(errList, fmt.Errorf("failed to get enhough samples for group numa memory, qosLevel=%v, numa=%v", qosLevel, numaID))
} else {
resourceNUMAMetric.Memory = aggNUMAMem
}

resourceNUMAMetrics = append(resourceNUMAMetrics, nodeapis.NUMAMetricInfo{
NUMAId: numaID,
Usage: &resourceNUMAMetric,
})
}

err := errors.NewAggregate(errList)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

klog.InfoS("group usage", "qosLevel", qosLevel, "memory", general.FormatMemoryQuantity(memory.AsApproximateFloat64()),
"aggMemory", general.FormatMemoryQuantity(aggMemory.AsApproximateFloat64()), "cpu", cpu.AsApproximateFloat64(), "aggCPU", aggCPU.AsApproximateFloat64())

return resourceMetric, effectivePods, nil
return resourceMetric, resourceNUMAMetrics, effectivePods, nil
}

func (p *nodeMetricsReporterPlugin) getNodeMemoryUsage() (*resource.Quantity, error) {
Expand Down Expand Up @@ -510,6 +620,19 @@ func (p *nodeMetricsReporterPlugin) getNodeNUMAMemoryUsage(numaID int) (*resourc
return v, nil
}

func (p *nodeMetricsReporterPlugin) getNodeNUMACPUUsage(numaID int) (*resource.Quantity, error) {
metricCPUUsageNuma, err := p.metaServer.GetNumaMetric(numaID, consts.MetricCPUUsageNuma)
if err != nil {
return nil, fmt.Errorf("failed to get metricCPUUsageNuma of numa%v, err %v", numaID, err)
}
v := resource.NewMilliQuantity(int64(metricCPUUsageNuma.Value*1000), resource.DecimalSI)
v = p.getAggregatedMetric(v, v1.ResourceCPU, "getNodeNUMACPUUsage", strconv.Itoa(numaID))
if v == nil {
return nil, fmt.Errorf("failed to get enough samples for numa%v memory", numaID)
}
return v, nil
}

func (p *nodeMetricsReporterPlugin) getAggregatedMetric(value *resource.Quantity, resourceName v1.ResourceName, funcName string, args ...string) *resource.Quantity {
opts, ok := p.smoothWindowOpts[resourceName]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

Expand Down Expand Up @@ -138,11 +139,16 @@ func TestNodeMetricUpdate(t *testing.T) {
for numaId := 0; numaId < 2; numaId++ {
f.SetNumaMetric(numaId, consts.MetricMemUsedNuma, utilmetric.MetricData{Value: 50 << 30})
f.SetNumaMetric(numaId, consts.MetricMemFilepageNuma, utilmetric.MetricData{Value: 10 << 30})
f.SetNumaMetric(numaId, consts.MetricCPUUsageNuma, utilmetric.MetricData{Value: 2})
}

f.SetContainerMetric("uid1", "container1", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid1", "container1", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
f.SetContainerMetric("uid1", "container1", consts.MetricMemCacheContainer, utilmetric.MetricData{Value: 10 << 30})
for numaId := 0; numaId < 2; numaId++ {
f.SetContainerNumaMetric("uid1", "container1", numaId, consts.MetricsCPUUsageNUMAContainer, utilmetric.MetricData{Value: 1})
f.SetContainerNumaMetric("uid1", "container1", numaId, consts.MetricsMemAnonPerNumaContainer, utilmetric.MetricData{Value: 1 << 30})
}

f.SetContainerMetric("uid2", "container2", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid2", "container2", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
Expand All @@ -151,6 +157,10 @@ func TestNodeMetricUpdate(t *testing.T) {
f.SetContainerMetric("uid3", "container3", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid3", "container3", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
f.SetContainerMetric("uid3", "container3", consts.MetricMemCacheContainer, utilmetric.MetricData{Value: 10 << 30})

f.SetContainerMetric("uid4", "container4", consts.MetricCPUUsageContainer, utilmetric.MetricData{Value: 1})
f.SetContainerMetric("uid4", "container4", consts.MetricMemUsageContainer, utilmetric.MetricData{Value: 20 << 30})
f.SetContainerMetric("uid4", "container4", consts.MetricMemCacheContainer, utilmetric.MetricData{Value: 10 << 30})
},
pods: []*corev1.Pod{
{
Expand Down Expand Up @@ -228,6 +238,31 @@ func TestNodeMetricUpdate(t *testing.T) {
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod4",
Namespace: "default",
UID: "uid4",
Annotations: map[string]string{apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSharedCores},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container4",
Resources: corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("40Gi"),
},
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("20Gi"),
},
},
},
},
},
},
},
containers: []types2.ContainerInfo{
{
Expand All @@ -254,6 +289,16 @@ func TestNodeMetricUpdate(t *testing.T) {
ContainerType: pluginapi.ContainerType_MAIN,
ContainerIndex: 0,
},
{
PodUID: "uid4",
PodNamespace: "default",
PodName: "pod4",
ContainerName: "container4",
ContainerType: pluginapi.ContainerType_MAIN,
ContainerIndex: 0,
TopologyAwareAssignments: map[int]machine.CPUSet{0: machine.NewCPUSet(0), 1: machine.NewCPUSet(2)},
RampUp: true,
},
},
},
wantErr: false,
Expand All @@ -266,12 +311,14 @@ func TestNodeMetricUpdate(t *testing.T) {
NUMAId: 0,
Usage: &nodeapis.ResourceMetric{
Memory: mustParse("40Gi"),
CPU: mustParse("2"),
},
},
{
NUMAId: 1,
Usage: &nodeapis.ResourceMetric{
Memory: mustParse("40Gi"),
CPU: mustParse("2"),
},
},
},
Expand All @@ -296,13 +343,28 @@ func TestNodeMetricUpdate(t *testing.T) {
{
QoSLevel: apiconsts.PodAnnotationQoSLevelSharedCores,
ResourceUsage: nodeapis.ResourceUsage{
NUMAUsage: nil,
NUMAUsage: []nodeapis.NUMAMetricInfo{
{
NUMAId: 0,
Usage: &nodeapis.ResourceMetric{
CPU: mustParse("2"),
Memory: mustParse("11Gi"),
},
},
{
NUMAId: 1,
Usage: &nodeapis.ResourceMetric{
CPU: mustParse("2"),
Memory: mustParse("11Gi"),
},
},
},
GenericUsage: &nodeapis.ResourceMetric{
CPU: mustParse("1"),
Memory: mustParse("10Gi"),
CPU: mustParse("3"),
Memory: mustParse("30Gi"),
},
},
PodList: []string{"default/pod1"},
PodList: []string{"default/pod1", "default/pod4"},
},
{
QoSLevel: apiconsts.PodAnnotationQoSLevelReclaimedCores,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2441,7 +2441,7 @@ func TestUpdate(t *testing.T) {
metricsFetcher.SetContainerMetric(containerMetric.podUID, containerMetric.containerName, containerMetric.metricName, containerMetric.metricValue)
}
for _, containerNUMAMetric := range tt.containerNUMAMetrics {
metricsFetcher.SetContainerNumaMetric(containerNUMAMetric.podUID, containerNUMAMetric.containerName, strconv.Itoa(containerNUMAMetric.numaID), containerNUMAMetric.metricName, containerNUMAMetric.metricValue)
metricsFetcher.SetContainerNumaMetric(containerNUMAMetric.podUID, containerNUMAMetric.containerName, containerNUMAMetric.numaID, containerNUMAMetric.metricName, containerNUMAMetric.metricValue)
}
for _, qosClassMetric := range tt.cgroupMetrics {
metricsFetcher.SetCgroupMetric(qosClassMetric.cgroupPath, qosClassMetric.metricName, qosClassMetric.metricValue)
Expand Down
Loading

0 comments on commit 22e8f39

Please sign in to comment.