From 7935eded5ed109bdca58755e233afbb266c73ed9 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 15 Dec 2023 08:36:11 +0800 Subject: [PATCH] YARN-11627. [GPG] Improve GPGPolicyFacade#getPolicyManager. (#6332) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri Signed-off-by: Shilun Fan --- .../manager/AbstractPolicyManager.java | 14 ++ .../manager/FederationPolicyManager.java | 24 +++ .../manager/HashBroadcastPolicyManager.java | 18 ++ .../policies/manager/HomePolicyManager.java | 18 ++ .../PriorityBroadcastPolicyManager.java | 4 + .../manager/RejectAllPolicyManager.java | 18 ++ .../UniformBroadcastPolicyManager.java | 18 ++ .../manager/WeightedHomePolicyManager.java | 5 + .../WeightedLocalityPolicyManager.java | 4 + ...nPolicyInitializationContextValidator.java | 14 ++ .../GPGPolicyFacade.java | 41 +++-- .../TestGPGPolicyFacade.java | 155 ++++++++++++++++++ .../TestSequentialBroadcastPolicyManager.java | 15 ++ 13 files changed, 331 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java index aa0742d090c2a..f78555e2a477c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; @@ -187,4 +188,17 @@ private FederationPolicyInitializationContext updateContext( federationPolicyContext.getHomeSubcluster()); } + /** + * We get the WeightedPolicyInfo of the subCluster. + * + * @return WeightedPolicyInfo. + */ + public abstract WeightedPolicyInfo getWeightedPolicyInfo(); + + /** + * We set the WeightedPolicyInfo of the subCluster. + * + * @param weightedPolicyInfo weightedPolicyInfo of the subCluster. + */ + public abstract void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java index 3aeb7d718e2d4..3e1b38dc5ecf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java @@ -19,6 +19,7 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; @@ -115,4 +116,27 @@ SubClusterPolicyConfiguration serializeConf() */ void setQueue(String queue); + /** + * This method returns the queue WeightedPolicyInfo + * this policy is configured for. + * + * @return the name of the queue. + */ + WeightedPolicyInfo getWeightedPolicyInfo(); + + /** + * This methods provides a setter for the queue WeightedPolicyInfo + * this policy is specified for. + * + * @param weightedPolicyInfo weightedPolicyInfo of the subCluster. + */ + void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo); + + /** + * PolicyManager Whether to support WeightedPolicyInfo. + * Some of PolicyManagers do not support WeightedPolicyInfo. + * @return true, supports WeightedPolicyInfo; + * false, WeightedPolicyInfo is not supported + */ + boolean isSupportWeightedPolicyInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java index 08ab08fedf42b..0f54491e5a0bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java @@ -17,7 +17,9 @@ package org.apache.hadoop.yarn.server.federation.policies.manager; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy; /** @@ -35,4 +37,20 @@ public HashBroadcastPolicyManager() { amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; } + @Override + public WeightedPolicyInfo getWeightedPolicyInfo() { + throw new NotImplementedException( + "HashBroadcastPolicyManager does not implement getWeightedPolicyInfo."); + } + + @Override + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + throw new NotImplementedException( + "HashBroadcastPolicyManager does not implement setWeightedPolicyInfo."); + } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java index 93aa248864236..48d4b76f14ab1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HomePolicyManager.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.Collections; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; @@ -58,4 +59,21 @@ public SubClusterPolicyConfiguration serializeConf() return SubClusterPolicyConfiguration.newInstance( getQueue(), this.getClass().getCanonicalName(), buf); } + + @Override + public WeightedPolicyInfo getWeightedPolicyInfo() { + throw new NotImplementedException( + "HomePolicyManager does not implement getWeightedPolicyInfo."); + } + + @Override + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + throw new NotImplementedException( + "HomePolicyManager does not implement setWeightedPolicyInfo."); + } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java index 4448c07fdc4ae..c46abad7dde44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java @@ -63,4 +63,8 @@ public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { this.weightedPolicyInfo = weightedPolicyInfo; } + @Override + public boolean isSupportWeightedPolicyInfo() { + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/RejectAllPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/RejectAllPolicyManager.java index 290b67abf550d..de9894eb01e70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/RejectAllPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/RejectAllPolicyManager.java @@ -17,7 +17,9 @@ package org.apache.hadoop.yarn.server.federation.policies.manager; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.RejectAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.router.RejectRouterPolicy; /** @@ -37,4 +39,20 @@ public RejectAllPolicyManager() { amrmProxyFederationPolicy = RejectAMRMProxyPolicy.class; } + @Override + public WeightedPolicyInfo getWeightedPolicyInfo() { + throw new NotImplementedException( + "RejectAllPolicyManager does not implement getWeightedPolicyInfo."); + } + + @Override + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + throw new NotImplementedException( + "RejectAllPolicyManager does not implement setWeightedPolicyInfo."); + } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java index 5db0466bd89b9..aba6bf10b29a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java @@ -17,7 +17,9 @@ package org.apache.hadoop.yarn.server.federation.policies.manager; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; /** @@ -41,4 +43,20 @@ public UniformBroadcastPolicyManager() { amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; } + @Override + public WeightedPolicyInfo getWeightedPolicyInfo() { + throw new NotImplementedException( + "UniformBroadcastPolicyManager does not implement getWeightedPolicyInfo."); + } + + @Override + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + throw new NotImplementedException( + "UniformBroadcastPolicyManager does not implement setWeightedPolicyInfo."); + } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java index 370594ec32d36..215f7e4cd7236 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java @@ -62,4 +62,9 @@ public void setWeightedPolicyInfo( WeightedPolicyInfo weightedPolicyInfo) { this.weightedPolicyInfo = weightedPolicyInfo; } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java index 0706b6623944b..dde7ccb7ee8b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java @@ -65,4 +65,8 @@ public void setWeightedPolicyInfo( this.weightedPolicyInfo = weightedPolicyInfo; } + @Override + public boolean isSupportWeightedPolicyInfo() { + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index 91b025f75f35f..fbc3ffcb7f900 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; @@ -135,6 +136,19 @@ public void setQueue(String queue) { } + @Override + public WeightedPolicyInfo getWeightedPolicyInfo() { + return null; + } + + @Override + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return false; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java index 9a9de440466cf..78ce60c621b0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; @@ -32,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -100,41 +100,48 @@ public GPGPolicyFacade(FederationStateStoreFacade stateStore, public FederationPolicyManager getPolicyManager(String queueName) throws YarnException { FederationPolicyManager policyManager = policyManagerMap.get(queueName); + // If we don't have the policy manager cached, pull configuration // from the FederationStateStore to create and cache it if (policyManager == null) { try { + // If we don't have the configuration cached, pull it // from the stateStore SubClusterPolicyConfiguration conf = policyConfMap.get(queueName); + if (conf == null) { conf = stateStore.getPolicyConfiguration(queueName); } + // If configuration is still null, it does not exist in the // FederationStateStore if (conf == null) { - LOG.info("Read null policy for queue {}", queueName); + LOG.info("Read null policy for queue {}.", queueName); return null; } - policyManager = - FederationPolicyUtils.instantiatePolicyManager(conf.getType()); + + // Generate PolicyManager based on PolicyManagerType. + String policyManagerType = conf.getType(); + policyManager = FederationPolicyUtils.instantiatePolicyManager(policyManagerType); policyManager.setQueue(queueName); - // TODO there is currently no way to cleanly deserialize a policy - // manager sub type from just the configuration - if (policyManager instanceof WeightedLocalityPolicyManager) { - WeightedPolicyInfo wpinfo = + // If PolicyManager supports Weighted PolicyInfo, it means that + // we need to use this parameter to determine which sub-cluster the router goes to + // or which sub-cluster the container goes to. + if (policyManager.isSupportWeightedPolicyInfo()) { + ByteBuffer weightedPolicyInfoParams = conf.getParams(); + if (weightedPolicyInfoParams == null) { + LOG.warn("Warning: Queue = {}, FederationPolicyManager {} WeightedPolicyInfo is empty.", + queueName, policyManagerType); + return null; + } + WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(conf.getParams()); - WeightedLocalityPolicyManager wlpmanager = - (WeightedLocalityPolicyManager) policyManager; - LOG.info("Updating policy for queue {} to configured weights router: " - + "{}, amrmproxy: {}", queueName, - wpinfo.getRouterPolicyWeights(), - wpinfo.getAMRMPolicyWeights()); - wlpmanager.setWeightedPolicyInfo(wpinfo); + policyManager.setWeightedPolicyInfo(weightedPolicyInfo); } else { - LOG.warn("Warning: FederationPolicyManager of unsupported type {}, " - + "initialization may be incomplete ", policyManager.getClass()); + LOG.warn("Warning: FederationPolicyManager of unsupported WeightedPolicyInfo type {}, " + + "initialization may be incomplete.", policyManager.getClass()); } policyManagerMap.put(queueName, policyManager); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java index d5408dfdafbc5..217c4a58967fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java @@ -23,11 +23,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.HashBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.HomePolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.RejectAllPolicyManager; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; @@ -35,6 +43,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.junit.After; import org.junit.Assert; @@ -42,6 +51,10 @@ import org.junit.Test; import org.mockito.Matchers; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -198,4 +211,146 @@ public void testReadOnly() throws YarnException { Matchers.any(SetSubClusterPolicyConfigurationRequest.class)); } + @Test + public void testGetWeightedLocalityPolicyManager() throws YarnException { + stateStore = new MemoryFederationStateStore(); + stateStore.init(new Configuration()); + + // root.a uses WeightedLocalityPolicyManager. + // Step1. Prepare amRMPolicyWeights and routerPolicyWeights + Map amrmPolicyWeights = new HashMap<>(); + amrmPolicyWeights.put(new SubClusterIdInfo("SC-1"), 0.7f); + amrmPolicyWeights.put(new SubClusterIdInfo("SC-2"), 0.3f); + + Map routerPolicyWeights = new HashMap<>(); + routerPolicyWeights.put(new SubClusterIdInfo("SC-1"), 0.6f); + routerPolicyWeights.put(new SubClusterIdInfo("SC-2"), 0.4f); + + WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo(); + weightedPolicyInfo.setHeadroomAlpha(1); + weightedPolicyInfo.setAMRMPolicyWeights(amrmPolicyWeights); + weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights); + + // Step2. Set PolicyConfiguration. + String policyManagerType = WeightedLocalityPolicyManager.class.getName(); + SubClusterPolicyConfiguration config = SubClusterPolicyConfiguration.newInstance("root.a", + policyManagerType, weightedPolicyInfo.toByteBuffer()); + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest.newInstance(config); + stateStore.setPolicyConfiguration(request); + + // Step3. Get FederationPolicyManager using policyFacade. + facade.reinitialize(stateStore, conf); + policyFacade = new GPGPolicyFacade(facade, conf); + FederationPolicyManager policyManager = policyFacade.getPolicyManager("root.a"); + Assert.assertNotNull(policyManager); + Assert.assertTrue(policyManager.isSupportWeightedPolicyInfo()); + WeightedPolicyInfo weightedPolicyInfo1 = policyManager.getWeightedPolicyInfo(); + Assert.assertNotNull(weightedPolicyInfo1); + Assert.assertTrue(policyManager instanceof WeightedLocalityPolicyManager); + + // Step4. Confirm amrmPolicyWeight is accurate. + Map amrmPolicyWeights1 = weightedPolicyInfo1.getAMRMPolicyWeights(); + Assert.assertNotNull(amrmPolicyWeights1); + Float sc1Float = amrmPolicyWeights1.get(new SubClusterIdInfo("SC-1")); + Float sc2Float = amrmPolicyWeights1.get(new SubClusterIdInfo("SC-2")); + Assert.assertEquals(0.7, sc1Float, 0.001); + Assert.assertEquals(0.3, sc2Float, 0.001); + + // Step5. Confirm amrmPolicyWeight is accurate. + Map routerPolicyWeights1 = + weightedPolicyInfo1.getRouterPolicyWeights(); + Assert.assertNotNull(routerPolicyWeights1); + Float sc1Float1 = routerPolicyWeights1.get(new SubClusterIdInfo("SC-1")); + Float sc2Float2 = routerPolicyWeights1.get(new SubClusterIdInfo("SC-2")); + Assert.assertEquals(0.6, sc1Float1, 0.001); + Assert.assertEquals(0.4, sc2Float2, 0.001); + } + + @Test + public void testGetWeightedHomePolicyManager() throws YarnException { + stateStore = new MemoryFederationStateStore(); + stateStore.init(new Configuration()); + + // root.b uses WeightedHomePolicyManager. + // Step1. Prepare routerPolicyWeights. + Map routerPolicyWeights = new HashMap<>(); + routerPolicyWeights.put(new SubClusterIdInfo("SC-1"), 0.8f); + routerPolicyWeights.put(new SubClusterIdInfo("SC-2"), 0.2f); + + WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo(); + weightedPolicyInfo.setHeadroomAlpha(1); + weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights); + + // Step2. Set PolicyConfiguration. + String policyManagerType = WeightedHomePolicyManager.class.getName(); + SubClusterPolicyConfiguration config = SubClusterPolicyConfiguration.newInstance("root.b", + policyManagerType, weightedPolicyInfo.toByteBuffer()); + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest.newInstance(config); + stateStore.setPolicyConfiguration(request); + + // Step3. Get FederationPolicyManager using policyFacade. + facade.reinitialize(stateStore, conf); + policyFacade = new GPGPolicyFacade(facade, conf); + FederationPolicyManager policyManager = policyFacade.getPolicyManager("root.b"); + Assert.assertNotNull(policyManager); + Assert.assertTrue(policyManager.isSupportWeightedPolicyInfo()); + WeightedPolicyInfo weightedPolicyInfo1 = policyManager.getWeightedPolicyInfo(); + Assert.assertNotNull(weightedPolicyInfo1); + + // Step4. Confirm amrmPolicyWeight is accurate. + Map amrmPolicyWeights1 = weightedPolicyInfo1.getAMRMPolicyWeights(); + Assert.assertNotNull(amrmPolicyWeights1); + Assert.assertEquals(0, amrmPolicyWeights1.size()); + + // Step5. Confirm amrmPolicyWeight is accurate. + Map routerPolicyWeights1 = + weightedPolicyInfo1.getRouterPolicyWeights(); + Assert.assertNotNull(routerPolicyWeights1); + Float sc1Float1 = routerPolicyWeights1.get(new SubClusterIdInfo("SC-1")); + Float sc2Float2 = routerPolicyWeights1.get(new SubClusterIdInfo("SC-2")); + Assert.assertEquals(0.8, sc1Float1, 0.001); + Assert.assertEquals(0.2, sc2Float2, 0.001); + } + + @Test + public void testGetUniformBroadcastPolicyManager() throws Exception { + stateStore = new MemoryFederationStateStore(); + stateStore.init(new Configuration()); + + List notSupportWeightedPolicyInfos = new ArrayList<>(); + notSupportWeightedPolicyInfos.add(HashBroadcastPolicyManager.class.getName()); + notSupportWeightedPolicyInfos.add(UniformBroadcastPolicyManager.class.getName()); + notSupportWeightedPolicyInfos.add(HomePolicyManager.class.getName()); + notSupportWeightedPolicyInfos.add(RejectAllPolicyManager.class.getName()); + String prefix = "org.apache.hadoop.yarn.server.federation.policies.manager."; + + for (String policyManagerType : notSupportWeightedPolicyInfos) { + // root.c uses UniformBroadcastPolicyManager. + // Step1. Prepare routerPolicyWeights. + WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo(); + weightedPolicyInfo.setHeadroomAlpha(1); + + // Step2. Set PolicyConfiguration. + SubClusterPolicyConfiguration config = SubClusterPolicyConfiguration.newInstance("root.c", + policyManagerType, weightedPolicyInfo.toByteBuffer()); + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest.newInstance(config); + stateStore.setPolicyConfiguration(request); + + // Step3. Get FederationPolicyManager using policyFacade. + facade.reinitialize(stateStore, conf); + policyFacade = new GPGPolicyFacade(facade, conf); + FederationPolicyManager policyManager = policyFacade.getPolicyManager("root.c"); + Assert.assertNotNull(policyManager); + Assert.assertFalse(policyManager.isSupportWeightedPolicyInfo()); + String policyManagerTypeSimple = policyManagerType.replace(prefix, ""); + // Verify that PolicyManager is initialized successfully, + // but getWeightedPolicyInfo is not supported. + LambdaTestUtils.intercept(NotImplementedException.class, + policyManagerTypeSimple + " does not implement getWeightedPolicyInfo.", + () -> policyManager.getWeightedPolicyInfo()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java index dfa8c7136d76c..ffc702f64eac3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.clientrm; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager; /** @@ -36,4 +37,18 @@ public TestSequentialBroadcastPolicyManager() { routerFederationPolicy = TestSequentialRouterPolicy.class; amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; } + + @Override + public WeightedPolicyInfo getWeightedPolicyInfo() { + return null; + } + + @Override + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + } + + @Override + public boolean isSupportWeightedPolicyInfo() { + return false; + } }