Skip to content

Commit

Permalink
qrm cpu plugin support reclaimed with numa binding admission and prov…
Browse files Browse the repository at this point in the history
…ision
  • Loading branch information
luomingmeng committed Oct 11, 2024
1 parent c0f3ed5 commit cd4ebfd
Show file tree
Hide file tree
Showing 13 changed files with 707 additions and 112 deletions.
33 changes: 21 additions & 12 deletions cmd/katalyst-agent/app/options/qrm/cpu_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type CPUOptions struct {
}

type CPUDynamicPolicyOptions struct {
EnableCPUAdvisor bool
EnableCPUPressureEviction bool
LoadPressureEvictionSkipPools []string
EnableSyncingCPUIdle bool
EnableCPUIdle bool
CPUNUMAHintPreferPolicy string
CPUNUMAHintPreferLowThreshold float64
EnableCPUAdvisor bool
EnableCPUPressureEviction bool
LoadPressureEvictionSkipPools []string
EnableSyncingCPUIdle bool
EnableCPUIdle bool
CPUNUMAHintPreferPolicy string
CPUNUMAHintPreferLowThreshold float64
CPUNUMAReclaimedHintPreferPolicy string
CPUNUMAReclaimedHintPreferLowThreshold float64
}

type CPUNativePolicyOptions struct {
Expand All @@ -54,11 +56,12 @@ func NewCPUOptions() *CPUOptions {
ReservedCPUCores: 0,
SkipCPUStateCorruption: false,
CPUDynamicPolicyOptions: CPUDynamicPolicyOptions{
EnableCPUAdvisor: false,
EnableCPUPressureEviction: false,
EnableSyncingCPUIdle: false,
EnableCPUIdle: false,
CPUNUMAHintPreferPolicy: cpuconsts.CPUNUMAHintPreferPolicySpreading,
EnableCPUAdvisor: false,
EnableCPUPressureEviction: false,
EnableSyncingCPUIdle: false,
EnableCPUIdle: false,
CPUNUMAHintPreferPolicy: cpuconsts.CPUNUMAHintPreferPolicySpreading,
CPUNUMAReclaimedHintPreferPolicy: cpuconsts.CPUNUMAHintPreferPolicyNone,
LoadPressureEvictionSkipPools: []string{
commonstate.PoolNameReclaim,
commonstate.PoolNameDedicated,
Expand Down Expand Up @@ -97,6 +100,10 @@ func (o *CPUOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"it decides hint preference calculation strategy")
fs.Float64Var(&o.CPUNUMAHintPreferLowThreshold, "cpu-numa-hint-prefer-low-threshold", o.CPUNUMAHintPreferLowThreshold,
"it indicates threshold to apply CPUNUMAHintPreferPolicy dynamically, and it's working when CPUNUMAHintPreferPolicy is set to dynamic_packing")
fs.StringVar(&o.CPUNUMAReclaimedHintPreferPolicy, "cpu-numa-reclaimed-hint-prefer-policy", o.CPUNUMAReclaimedHintPreferPolicy,
"it decides hint preference calculation strategy for reclaimed cpu")
fs.Float64Var(&o.CPUNUMAReclaimedHintPreferLowThreshold, "cpu-numa-reclaimed-hint-prefer-low-threshold", o.CPUNUMAReclaimedHintPreferLowThreshold,
"it indicates threshold to apply CPUNUMAReclaimedHintPreferPolicy dynamically, and it's working when CPUNUMAReclaimedHintPreferPolicy is set to dynamic_packing")
fs.StringVar(&o.CPUAllocationOption, "cpu-allocation-option",
o.CPUAllocationOption, "The allocation option of cpu (packed/distributed). The default value is packed."+
"in cases where more than one NUMA node is required to satisfy the allocation.")
Expand All @@ -118,5 +125,7 @@ func (o *CPUOptions) ApplyTo(conf *qrmconfig.CPUQRMPluginConfig) error {
conf.CPUAllocationOption = o.CPUAllocationOption
conf.CPUNUMAHintPreferPolicy = o.CPUNUMAHintPreferPolicy
conf.CPUNUMAHintPreferLowThreshold = o.CPUNUMAHintPreferLowThreshold
conf.CPUNUMAReclaimedHintPreferPolicy = o.CPUNUMAHintPreferPolicy
conf.CPUNUMAReclaimedHintPreferLowThreshold = o.CPUNUMAReclaimedHintPreferLowThreshold
return nil
}
83 changes: 71 additions & 12 deletions pkg/agent/qrm-plugins/commonstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,54 @@ func (am *AllocationMeta) GetSpecifiedPoolName() string {
return GetSpecifiedPoolName(am.QoSLevel, am.Annotations[consts.PodAnnotationCPUEnhancementCPUSet])
}

// GetSpecifiedNUMABindingNUMAID parses the numa id for AllocationInfo
func (am *AllocationMeta) GetSpecifiedNUMABindingNUMAID() (int, error) {
if am == nil {
return FakedNUMAID, fmt.Errorf("empty am")
}

if _, ok := am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint]; ok {
return FakedNUMAID, nil
}

numaSet, pErr := machine.Parse(am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint])
if pErr != nil {
return FakedNUMAID, fmt.Errorf("parse numaHintStr: %s failed with error: %v",
am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint], pErr)
} else if numaSet.Size() != 1 {
return FakedNUMAID, fmt.Errorf("parse numaHintStr: %s with invalid size", numaSet.String())
}

return numaSet.ToSliceNoSortInt()[0], nil
}

// SetSpecifiedNUMABindingNUMAID set the numa id for AllocationInfo
func (am *AllocationMeta) SetSpecifiedNUMABindingNUMAID(numaID uint64) {
if am == nil {
return
}

if am.Annotations == nil {
am.Annotations = make(map[string]string)
}

am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint] = machine.NewCPUSet(int(numaID)).String()
}

// GetSpecifiedNUMABindingPoolName get numa_binding pool name
// for numa_binding shared_cores according to enhancements and NUMA hint
func (am *AllocationMeta) GetSpecifiedNUMABindingPoolName() (string, error) {
if !am.CheckSharedNUMABinding() {
return EmptyOwnerPoolName, fmt.Errorf("GetSpecifiedNUMABindingPoolName only for numa_binding shared_cores")
}

numaSet, pErr := machine.Parse(am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint])
if pErr != nil {
return EmptyOwnerPoolName, fmt.Errorf("parse numaHintStr: %s failed with error: %v",
am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint], pErr)
} else if numaSet.Size() != 1 {
return EmptyOwnerPoolName, fmt.Errorf("parse numaHintStr: %s with invalid size", numaSet.String())
numaID, err := am.GetSpecifiedNUMABindingNUMAID()
if err != nil {
return EmptyOwnerPoolName, err
}

if numaID == FakedNUMAID {
return EmptyOwnerPoolName, fmt.Errorf("invalid numa id for numa_binding shared_cores")
}

specifiedPoolName := am.GetSpecifiedPoolName()
Expand All @@ -113,10 +148,10 @@ func (am *AllocationMeta) GetSpecifiedNUMABindingPoolName() (string, error) {
return EmptyOwnerPoolName, fmt.Errorf("empty specifiedPoolName")
}

return GetNUMAPoolName(specifiedPoolName, numaSet.ToSliceNoSortUInt64()[0]), nil
return GetNUMAPoolName(specifiedPoolName, numaID), nil
}

func GetNUMAPoolName(candidateSpecifiedPoolName string, targetNUMANode uint64) string {
func GetNUMAPoolName(candidateSpecifiedPoolName string, targetNUMANode int) string {
return fmt.Sprintf("%s%s%d", candidateSpecifiedPoolName, NUMAPoolInfix, targetNUMANode)
}

Expand Down Expand Up @@ -191,6 +226,14 @@ func (am *AllocationMeta) CheckNUMABinding() bool {
consts.PodAnnotationMemoryEnhancementNumaBindingEnable
}

// CheckActualNUMABinding returns true if the AllocationInfo is for pod actual numa-binding
func (am *AllocationMeta) CheckActualNUMABinding() bool {
if am == nil {
return false
}
return am.Annotations[cpuconsts.CPUStateAnnotationKeyNUMAHint] == ""
}

// CheckDedicatedNUMABinding returns true if the AllocationInfo is for pod with
// dedicated-qos and numa-binding enhancement
func (am *AllocationMeta) CheckDedicatedNUMABinding() bool {
Expand All @@ -206,13 +249,29 @@ func (am *AllocationMeta) CheckSharedNUMABinding() bool {
// CheckSharedOrDedicatedNUMABinding returns true if the AllocationInfo is for pod with
// shared-qos or dedicated-qos and numa-binding enhancement
func (am *AllocationMeta) CheckSharedOrDedicatedNUMABinding() bool {
if am == nil {
return false
}

return am.CheckSharedNUMABinding() || am.CheckDedicatedNUMABinding()
}

// CheckReclaimedNUMABinding returns true if the AllocationInfo is for pod with
// reclaimed-qos and numa-binding enhancement
func (am *AllocationMeta) CheckReclaimedNUMABinding() bool {
return am.CheckReclaimed() && am.CheckNUMABinding()
}

// CheckReclaimedActualNUMABinding returns true if the AllocationInfo is for pod with
// reclaimed-qos and numa-binding enhancement and numa hint is not empty, which means
// the container is allocated on a specific NUMA node
func (am *AllocationMeta) CheckReclaimedActualNUMABinding() bool {
return am.CheckReclaimedNUMABinding() && am.CheckActualNUMABinding()
}

// CheckReclaimedNonActualNUMABinding returns true if the AllocationInfo is for pod with
// reclaimed-qos and without binding to a specific NUMA node actually, which means
// the pod can be allocated on multi NUMA nodes
func (am *AllocationMeta) CheckReclaimedNonActualNUMABinding() bool {
return am.CheckReclaimed() && !am.CheckActualNUMABinding()
}

// CheckNumaExclusive returns true if the AllocationInfo is for pod with numa-exclusive enhancement
func (am *AllocationMeta) CheckNumaExclusive() bool {
if am == nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ const (
)

const (
// packing: refers to the strategy of putting as many containers as possible onto a single NUMA node in order to utilize the resources efficiently and reduce fragmentation.
// CPUNUMAHintPreferPolicyPacking refers to the strategy of putting as many containers as possible onto a single NUMA node in order to utilize the resources efficiently and reduce fragmentation.
CPUNUMAHintPreferPolicyPacking = "packing"
// spreading: tries to distributing containers across multiple nodes. Aiming to balance the load by avoiding overloading individual nodes.
// CPUNUMAHintPreferPolicySpreading tries to distributing containers across multiple nodes. Aiming to balance the load by avoiding overloading individual nodes.
CPUNUMAHintPreferPolicySpreading = "spreading"
// dynamic_packing: refers to the strategy of putting as many containers as possible onto a single NUMA node until the node hits configurable threshold.
// CPUNUMAHintPreferPolicyDynamicPacking refers to the strategy of putting as many containers as possible onto a single NUMA node until the node hits configurable threshold.
// if all nodes hit configurable threshold, use spreading policy instead.
CPUNUMAHintPreferPolicyDynamicPacking = "dynamic_packing"
// CPUNUMAHintPreferPolicyNone refers to the strategy of putting containers onto any NUMA node which is not overloaded.
CPUNUMAHintPreferPolicyNone = "none"
)
34 changes: 18 additions & 16 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,24 @@ type DynamicPolicy struct {

// those are parsed from configurations
// todo if we want to use dynamic configuration, we'd better not use self-defined conf
enableCPUAdvisor bool
reservedCPUs machine.CPUSet
cpuAdvisorSocketAbsPath string
cpuPluginSocketAbsPath string
extraStateFileAbsPath string
enableCPUIdle bool
enableSyncingCPUIdle bool
reclaimRelativeRootCgroupPath string
qosConfig *generic.QoSConfiguration
dynamicConfig *dynamicconfig.DynamicAgentConfiguration
podDebugAnnoKeys []string
podAnnotationKeptKeys []string
podLabelKeptKeys []string
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64
enableCPUAdvisor bool
reservedCPUs machine.CPUSet
cpuAdvisorSocketAbsPath string
cpuPluginSocketAbsPath string
extraStateFileAbsPath string
enableCPUIdle bool
enableSyncingCPUIdle bool
reclaimRelativeRootCgroupPath string
qosConfig *generic.QoSConfiguration
dynamicConfig *dynamicconfig.DynamicAgentConfiguration
podDebugAnnoKeys []string
podAnnotationKeptKeys []string
podLabelKeptKeys []string
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64
cpuNUMAReclaimedHintPreferPolicy string
cpuNUMAReclaimedHintPreferLowThreshold float64
}

func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
Expand Down
65 changes: 37 additions & 28 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
Difference(dedicatedCPUSet).
Difference(sharedBindingNUMACPUs)

// calculate NUMAs without actual numa_binding reclaimed pods
nonReclaimActualBindingNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.WrapAllocationMetaFilter((*commonstate.AllocationMeta).CheckReclaimedActualNUMABinding))

rampUpCPUsTopologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, rampUpCPUs)
if err != nil {
return fmt.Errorf("unable to calculate topologyAwareAssignments for rampUpCPUs, result cpuset: %s, error: %v",
Expand Down Expand Up @@ -614,18 +617,8 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
newEntries[podUID][containerName].OriginalAllocationResult = poolCPUSet.Clone()
newEntries[podUID][containerName].TopologyAwareAssignments = topologyAwareAssignments
newEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)
case consts.PodAnnotationQoSLevelSharedCores, consts.PodAnnotationQoSLevelReclaimedCores:
ownerPoolName := allocationInfo.GetOwnerPoolName()
if calculationInfo, ok := resp.GetCalculationInfo(podUID, containerName); ok {
general.Infof("cpu advisor put pod: %s/%s, container: %s from %s to %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, ownerPoolName, calculationInfo.OwnerPoolName)

ownerPoolName = calculationInfo.OwnerPoolName
} else {
general.Warningf("cpu advisor doesn't return entry for pod: %s/%s, container: %s, qosLevel: %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, allocationInfo.QoSLevel)
}

case consts.PodAnnotationQoSLevelSharedCores:
ownerPoolName := p.getOwnerPoolNameFromAdvisor(allocationInfo, resp)
if allocationInfo.RampUp {
general.Infof("pod: %s/%s container: %s is in ramp up, set its allocation result from %s to rampUpCPUs :%s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, allocationInfo.AllocationResult.String(), rampUpCPUs.String())
Expand All @@ -641,23 +634,11 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
newEntries[podUID][containerName].OriginalAllocationResult = rampUpCPUs.Clone()
newEntries[podUID][containerName].TopologyAwareAssignments = machine.DeepcopyCPUAssignment(rampUpCPUsTopologyAwareAssignments)
newEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(rampUpCPUsTopologyAwareAssignments)
} else if newEntries[ownerPoolName][commonstate.FakedContainerName] == nil {
errMsg := fmt.Sprintf("cpu advisor doesn't return entry for pool: %s and it's referred by pod: %s/%s, container: %s, qosLevel: %s",
ownerPoolName, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, allocationInfo.QoSLevel)

general.Errorf(errMsg)

_ = p.emitter.StoreInt64(util.MetricNameOrphanContainer, 1, metrics.MetricTypeNameCount,
metrics.MetricTag{Key: "podNamespace", Val: allocationInfo.PodNamespace},
metrics.MetricTag{Key: "podName", Val: allocationInfo.PodName},
metrics.MetricTag{Key: "containerName", Val: allocationInfo.ContainerName},
metrics.MetricTag{Key: "poolName", Val: ownerPoolName})
return fmt.Errorf(errMsg)
} else {
poolEntry := newEntries[ownerPoolName][commonstate.FakedContainerName]

general.Infof("put pod: %s/%s container: %s to pool: %s, set its allocation result from %s to %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, ownerPoolName, allocationInfo.AllocationResult.String(), poolEntry.AllocationResult.String())
poolEntry, err := p.getAllocationPoolEntry(allocationInfo, ownerPoolName, newEntries)
if err != nil {
return err
}

if allocationInfo.CheckSharedNUMABinding() {
poolEntry.QoSLevel = apiconsts.PodAnnotationQoSLevelSharedCores
Expand All @@ -668,12 +649,26 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
})
}

general.Infof("put pod: %s/%s container: %s to pool: %s, set its allocation result from %s to %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, ownerPoolName, allocationInfo.AllocationResult.String(), poolEntry.AllocationResult.String())

newEntries[podUID][containerName].OwnerPoolName = ownerPoolName
newEntries[podUID][containerName].AllocationResult = poolEntry.AllocationResult.Clone()
newEntries[podUID][containerName].OriginalAllocationResult = poolEntry.OriginalAllocationResult.Clone()
newEntries[podUID][containerName].TopologyAwareAssignments = machine.DeepcopyCPUAssignment(poolEntry.TopologyAwareAssignments)
newEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(poolEntry.TopologyAwareAssignments)
}
case consts.PodAnnotationQoSLevelReclaimedCores:
ownerPoolName := p.getOwnerPoolNameFromAdvisor(allocationInfo, resp)
poolEntry, err := p.getAllocationPoolEntry(allocationInfo, ownerPoolName, newEntries)
if err != nil {
return err
}

err = p.updateReclaimAllocationResultByPoolEntry(allocationInfo, poolEntry, nonReclaimActualBindingNUMAs)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid qosLevel: %s for pod: %s/%s container: %s",
allocationInfo.QoSLevel, allocationInfo.PodNamespace,
Expand All @@ -693,6 +688,20 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
return nil
}

func (p *DynamicPolicy) getOwnerPoolNameFromAdvisor(allocationInfo *state.AllocationInfo, resp *advisorapi.ListAndWatchResponse) string {
ownerPoolName := allocationInfo.GetOwnerPoolName()
if calculationInfo, ok := resp.GetCalculationInfo(allocationInfo.PodUid, allocationInfo.ContainerName); ok {
general.Infof("cpu advisor put pod: %s/%s, container: %s from %s to %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, ownerPoolName, calculationInfo.OwnerPoolName)

ownerPoolName = calculationInfo.OwnerPoolName
} else {
general.Warningf("cpu advisor doesn't return entry for pod: %s/%s, container: %s, qosLevel: %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, allocationInfo.QoSLevel)
}
return ownerPoolName
}

func (p *DynamicPolicy) applyNUMAHeadroom(resp *advisorapi.ListAndWatchResponse) error {
if resp == nil {
return fmt.Errorf("applyNUMAHeadroom got nil resp")
Expand Down
Loading

0 comments on commit cd4ebfd

Please sign in to comment.