Skip to content

Commit

Permalink
YARN-11627. [GPG] Improve GPGPolicyFacade#getPolicyManager. (apache#6332
Browse files Browse the repository at this point in the history
) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored Dec 15, 2023
1 parent cf21f35 commit 7935ede
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
Expand Down Expand Up @@ -187,4 +188,17 @@ private FederationPolicyInitializationContext updateContext(
federationPolicyContext.getHomeSubcluster());
}

/**
* We get the WeightedPolicyInfo of the subCluster.
*
* @return WeightedPolicyInfo.
*/
public abstract WeightedPolicyInfo getWeightedPolicyInfo();

/**
* We set the WeightedPolicyInfo of the subCluster.
*
* @param weightedPolicyInfo weightedPolicyInfo of the subCluster.
*/
public abstract void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
Expand Down Expand Up @@ -115,4 +116,27 @@ SubClusterPolicyConfiguration serializeConf()
*/
void setQueue(String queue);

/**
* This method returns the queue WeightedPolicyInfo
* this policy is configured for.
*
* @return the name of the queue.
*/
WeightedPolicyInfo getWeightedPolicyInfo();

/**
* This methods provides a setter for the queue WeightedPolicyInfo
* this policy is specified for.
*
* @param weightedPolicyInfo weightedPolicyInfo of the subCluster.
*/
void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo);

/**
* PolicyManager Whether to support WeightedPolicyInfo.
* Some of PolicyManagers do not support WeightedPolicyInfo.
* @return true, supports WeightedPolicyInfo;
* false, WeightedPolicyInfo is not supported
*/
boolean isSupportWeightedPolicyInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.hadoop.yarn.server.federation.policies.manager;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;

/**
Expand All @@ -35,4 +37,20 @@ public HashBroadcastPolicyManager() {
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
}

@Override
public WeightedPolicyInfo getWeightedPolicyInfo() {
throw new NotImplementedException(
"HashBroadcastPolicyManager does not implement getWeightedPolicyInfo.");
}

@Override
public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
throw new NotImplementedException(
"HashBroadcastPolicyManager does not implement setWeightedPolicyInfo.");
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
Expand Down Expand Up @@ -58,4 +59,21 @@ public SubClusterPolicyConfiguration serializeConf()
return SubClusterPolicyConfiguration.newInstance(
getQueue(), this.getClass().getCanonicalName(), buf);
}

@Override
public WeightedPolicyInfo getWeightedPolicyInfo() {
throw new NotImplementedException(
"HomePolicyManager does not implement getWeightedPolicyInfo.");
}

@Override
public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
throw new NotImplementedException(
"HomePolicyManager does not implement setWeightedPolicyInfo.");
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@ public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
this.weightedPolicyInfo = weightedPolicyInfo;
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.hadoop.yarn.server.federation.policies.manager;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.RejectAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.router.RejectRouterPolicy;

/**
Expand All @@ -37,4 +39,20 @@ public RejectAllPolicyManager() {
amrmProxyFederationPolicy = RejectAMRMProxyPolicy.class;
}

@Override
public WeightedPolicyInfo getWeightedPolicyInfo() {
throw new NotImplementedException(
"RejectAllPolicyManager does not implement getWeightedPolicyInfo.");
}

@Override
public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
throw new NotImplementedException(
"RejectAllPolicyManager does not implement setWeightedPolicyInfo.");
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.hadoop.yarn.server.federation.policies.manager;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;

/**
Expand All @@ -41,4 +43,20 @@ public UniformBroadcastPolicyManager() {
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
}

@Override
public WeightedPolicyInfo getWeightedPolicyInfo() {
throw new NotImplementedException(
"UniformBroadcastPolicyManager does not implement getWeightedPolicyInfo.");
}

@Override
public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
throw new NotImplementedException(
"UniformBroadcastPolicyManager does not implement setWeightedPolicyInfo.");
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public void setWeightedPolicyInfo(
WeightedPolicyInfo weightedPolicyInfo) {
this.weightedPolicyInfo = weightedPolicyInfo;
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ public void setWeightedPolicyInfo(
this.weightedPolicyInfo = weightedPolicyInfo;
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
Expand Down Expand Up @@ -135,6 +136,19 @@ public void setQueue(String queue) {

}

@Override
public WeightedPolicyInfo getWeightedPolicyInfo() {
return null;
}

@Override
public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
}

@Override
public boolean isSupportWeightedPolicyInfo() {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
Expand All @@ -32,6 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -100,41 +100,48 @@ public GPGPolicyFacade(FederationStateStoreFacade stateStore,
public FederationPolicyManager getPolicyManager(String queueName)
throws YarnException {
FederationPolicyManager policyManager = policyManagerMap.get(queueName);

// If we don't have the policy manager cached, pull configuration
// from the FederationStateStore to create and cache it
if (policyManager == null) {
try {

// If we don't have the configuration cached, pull it
// from the stateStore
SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);

if (conf == null) {
conf = stateStore.getPolicyConfiguration(queueName);
}

// If configuration is still null, it does not exist in the
// FederationStateStore
if (conf == null) {
LOG.info("Read null policy for queue {}", queueName);
LOG.info("Read null policy for queue {}.", queueName);
return null;
}
policyManager =
FederationPolicyUtils.instantiatePolicyManager(conf.getType());

// Generate PolicyManager based on PolicyManagerType.
String policyManagerType = conf.getType();
policyManager = FederationPolicyUtils.instantiatePolicyManager(policyManagerType);
policyManager.setQueue(queueName);

// TODO there is currently no way to cleanly deserialize a policy
// manager sub type from just the configuration
if (policyManager instanceof WeightedLocalityPolicyManager) {
WeightedPolicyInfo wpinfo =
// If PolicyManager supports Weighted PolicyInfo, it means that
// we need to use this parameter to determine which sub-cluster the router goes to
// or which sub-cluster the container goes to.
if (policyManager.isSupportWeightedPolicyInfo()) {
ByteBuffer weightedPolicyInfoParams = conf.getParams();
if (weightedPolicyInfoParams == null) {
LOG.warn("Warning: Queue = {}, FederationPolicyManager {} WeightedPolicyInfo is empty.",
queueName, policyManagerType);
return null;
}
WeightedPolicyInfo weightedPolicyInfo =
WeightedPolicyInfo.fromByteBuffer(conf.getParams());
WeightedLocalityPolicyManager wlpmanager =
(WeightedLocalityPolicyManager) policyManager;
LOG.info("Updating policy for queue {} to configured weights router: "
+ "{}, amrmproxy: {}", queueName,
wpinfo.getRouterPolicyWeights(),
wpinfo.getAMRMPolicyWeights());
wlpmanager.setWeightedPolicyInfo(wpinfo);
policyManager.setWeightedPolicyInfo(weightedPolicyInfo);
} else {
LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+ "initialization may be incomplete ", policyManager.getClass());
LOG.warn("Warning: FederationPolicyManager of unsupported WeightedPolicyInfo type {}, " +
"initialization may be incomplete.", policyManager.getClass());
}

policyManagerMap.put(queueName, policyManager);
Expand Down
Loading

0 comments on commit 7935ede

Please sign in to comment.