From 2d8792bccd9a498c9752dbe2c2ea5b134f39ac58 Mon Sep 17 00:00:00 2001 From: hanbingleixue Date: Fri, 3 Jan 2025 13:03:31 +0800 Subject: [PATCH] Fix some bug in xds flow control Signed-off-by: hanbingleixue --- .../mongodb/action.yml | 2 +- .../service/xds/entity/XdsRetryPolicy.java | 25 +++----- .../xds/entity/XdsRetryPolicyTest.java | 8 +-- .../xds/utils/RdsProtocolTransformer.java | 9 +-- .../common/handler/retry/AbstractRetry.java | 54 +++++++++++++++- .../common/handler/retry/Retry.java | 26 +++++++- .../common/util/XdsRouterUtils.java | 5 +- .../common/util/XdsThreadLocalUtil.java | 56 +++++++++++++++++ .../xds/circuit/XdsCircuitBreakerManager.java | 4 +- .../handler/retry/RetryContextTest.java | 13 +++- .../common/util/XdsAbstractTest.java | 2 +- .../common/xds/handler/XdsHandlerTest.java | 2 +- .../AbstractXdsHttpClientInterceptor.java | 14 ++++- .../retry/AlibabaDubboInvokerInterceptor.java | 2 +- .../retry/ApacheDubboInvokerInterceptor.java | 2 +- .../retry/client/HttpClient4xInterceptor.java | 1 + .../HttpUrlConnectionConnectInterceptor.java | 34 +++++++++- ...ttpUrlConnectionDisconnectInterceptor.java | 24 ++++++- ...rlConnectionResponseStreamInterceptor.java | 60 +++++++++++++----- .../client/OkHttp3ClientInterceptor.java | 2 + ...HttpClientInterceptorChainInterceptor.java | 1 + .../cluster/AlibabaDubboClusterInvoker.java | 2 +- .../cluster/ApacheDubboClusterInvoker.java | 2 +- .../handler/DefaultRetryPredicateCreator.java | 62 ++----------------- .../retry/handler/RetryHandlerV2.java | 6 +- .../retry/handler/RetryPredicateCreator.java | 4 +- .../DefaultRetryPredicateCreatorTest.java | 13 +++- .../XdsBusinessServerRequestHandler.java | 7 ++- .../chain/handler/XdsFaultRequestHandler.java | 3 +- .../handler/XdsRateLimitRequestHandler.java | 3 +- 30 files changed, 318 insertions(+), 130 deletions(-) diff --git a/.github/actions/scenarios/database-write-prohibition/mongodb/action.yml b/.github/actions/scenarios/database-write-prohibition/mongodb/action.yml index 407d1b7ea4..9f7013444c 100644 --- a/.github/actions/scenarios/database-write-prohibition/mongodb/action.yml +++ b/.github/actions/scenarios/database-write-prohibition/mongodb/action.yml @@ -11,7 +11,7 @@ runs: shell: bash run: | sudo apt-get install gnupg curl - curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | \ + curl -fsSL -k https://www.mongodb.org/static/pgp/server-7.0.asc | \ sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg \ --dearmor echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsRetryPolicy.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsRetryPolicy.java index c4e031efd2..e31abbf585 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsRetryPolicy.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/service/xds/entity/XdsRetryPolicy.java @@ -16,6 +16,8 @@ package io.sermant.core.service.xds.entity; +import java.util.List; + /** * Xds Retry Policy information * @@ -37,12 +39,7 @@ public class XdsRetryPolicy { /** * Specifies the conditions under which retry takes place */ - private String retryOn; - - /** - * Specifies the conditions under which retry takes place - */ - private String retryHostPredicate; + private List retryConditions; public long getMaxAttempts() { return maxAttempts; @@ -60,19 +57,11 @@ public void setPerTryTimeout(long perTryTimeout) { this.perTryTimeout = perTryTimeout; } - public String getRetryOn() { - return retryOn; - } - - public void setRetryOn(String retryOn) { - this.retryOn = retryOn; - } - - public String getRetryHostPredicate() { - return retryHostPredicate; + public List getRetryConditions() { + return retryConditions; } - public void setRetryHostPredicate(String retryHostPredicate) { - this.retryHostPredicate = retryHostPredicate; + public void setRetryConditions(List retryConditions) { + this.retryConditions = retryConditions; } } diff --git a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsRetryPolicyTest.java b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsRetryPolicyTest.java index def6954bcf..235345ccd3 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsRetryPolicyTest.java +++ b/sermant-agentcore/sermant-agentcore-core/src/test/java/io/sermant/core/service/xds/entity/XdsRetryPolicyTest.java @@ -19,6 +19,8 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Collections; + /** * XdsRetryPolicyTest * @@ -29,12 +31,10 @@ public class XdsRetryPolicyTest { @Test public void testXdsRetryPolicy() { XdsRetryPolicy xdsRetryPolicy = new XdsRetryPolicy(); - xdsRetryPolicy.setRetryHostPredicate("PreviousHostsPredicate"); - xdsRetryPolicy.setRetryOn("503"); + xdsRetryPolicy.setRetryConditions(Collections.singletonList("503")); xdsRetryPolicy.setMaxAttempts(8); xdsRetryPolicy.setPerTryTimeout(2000); - Assert.assertEquals("PreviousHostsPredicate", xdsRetryPolicy.getRetryHostPredicate()); - Assert.assertEquals("503", xdsRetryPolicy.getRetryOn()); + Assert.assertEquals("503", xdsRetryPolicy.getRetryConditions().get(0)); Assert.assertEquals(8, xdsRetryPolicy.getMaxAttempts()); Assert.assertEquals(2000, xdsRetryPolicy.getPerTryTimeout()); } diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java index 4e750e6c3e..bbe71b730f 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/io/sermant/implement/service/xds/utils/RdsProtocolTransformer.java @@ -38,6 +38,7 @@ import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; import io.envoyproxy.envoy.type.matcher.v3.StringMatcher; import io.envoyproxy.envoy.type.v3.FractionalPercent; +import io.sermant.core.common.CommonConstant; import io.sermant.core.common.LoggerFactory; import io.sermant.core.service.xds.entity.XdsAbort; import io.sermant.core.service.xds.entity.XdsDelay; @@ -68,6 +69,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -264,13 +266,12 @@ private static XdsClusterWeight parseClusterWeight(ClusterWeight clusterWeight) private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) { XdsRetryPolicy xdsRetryPolicy = new XdsRetryPolicy(); - xdsRetryPolicy.setRetryOn(retryPolicy.getRetryOn()); + if (!StringUtils.isEmpty(retryPolicy.getRetryOn())) { + xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA))); + } xdsRetryPolicy.setMaxAttempts(retryPolicy.getHostSelectionRetryMaxAttempts()); long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis(); xdsRetryPolicy.setPerTryTimeout(perTryTimeout); - if (retryPolicy.getRetryHostPredicateCount() != 0) { - xdsRetryPolicy.setRetryHostPredicate(retryPolicy.getRetryHostPredicate(0).getName()); - } return xdsRetryPolicy; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java index d0ca324c2c..83ed5af6ec 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/AbstractRetry.java @@ -19,8 +19,12 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.utils.CollectionUtils; import io.sermant.flowcontrol.common.config.FlowControlConfig; import io.sermant.flowcontrol.common.support.ReflectMethodCacheSupport; +import io.sermant.flowcontrol.common.xds.retry.RetryCondition; +import io.sermant.flowcontrol.common.xds.retry.RetryConditionType; import java.util.ArrayList; import java.util.List; @@ -63,7 +67,7 @@ protected final Class[] findClass(String[] classNames) { } @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { if (result == null) { return false; } @@ -71,8 +75,54 @@ public boolean needRetry(Set statusList, Object result) { return code.filter(statusList::contains).isPresent(); } + @Override + public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) { + if (result == null) { + return false; + } + List conditions = retryPolicy.getRetryConditions(); + if (CollectionUtils.isEmpty(conditions)) { + return false; + } + Optional statusCodeOptional = this.getCode(result); + if (!statusCodeOptional.isPresent()) { + return false; + } + String statusCode = statusCodeOptional.get(); + if (conditions.contains(statusCode)) { + return true; + } + for (String conditionName : conditions) { + Optional retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName); + if (!retryConditionOptional.isPresent()) { + continue; + } + if (retryConditionOptional.get().needRetry(this, null, statusCode, result)) { + return true; + } + } + return false; + } + + @Override + public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) { + if (ex == null) { + return false; + } + for (String conditionName : retryPolicy.getRetryConditions()) { + Optional retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName); + if (!retryConditionOptional.isPresent()) { + continue; + } + if (retryConditionOptional.get().needRetry(null, ex, null, null)) { + return true; + } + } + return false; + } + /** - * implemented by subclasses, if subclass implement {@link #needRetry(Set, Object)}, no need to implement the get + * implemented by subclasses, if subclass implement {@link #isNeedRetry(Set, Object)}, no need to implement the get * code method * * @param result interface response result diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java index e3db641444..ffb2139fb7 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/handler/retry/Retry.java @@ -17,6 +17,8 @@ package io.sermant.flowcontrol.common.handler.retry; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; + import java.util.Optional; import java.util.Set; @@ -28,13 +30,33 @@ */ public interface Retry { /** - * needToRetry + * Retry based on the request result. Retrying is required if the request status is in the statusList. * * @param statusList List of status codes, valid only for http applications * @param result responseResult * @return retryOrNot */ - boolean needRetry(Set statusList, Object result); + boolean isNeedRetry(Set statusList, Object result); + + /** + * Retry based on the request result. If the request result meets the retry conditions in the retry policy, + * a retry will be executed + * + * @param result responseResult + * @param retryPolicy retry policy information + * @return retryOrNot + */ + boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy); + + /** + * Retry based on the throwable. If the throwable during the execution of the request method meets the retry + * conditions in the retry policy, a retry will be executed + * + * @param throwable Exception thrown during retry + * @param retryPolicy Xds Retry Policy information + * @return retryOrNot + */ + boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy); /** * define which exceptions need to be retried diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java index 517f3a01d3..b9ac6301e1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsRouterUtils.java @@ -55,12 +55,13 @@ private XdsRouterUtils() { */ public static Optional getLocalityInfoOfSelfService() { if (localityObtainedFlag) { - return Optional.of(selfServiceLocality); + return Optional.ofNullable(selfServiceLocality); } synchronized (XdsRouterUtils.class) { if (localityObtainedFlag) { - return Optional.of(selfServiceLocality); + return Optional.ofNullable(selfServiceLocality); } + localityObtainedFlag = true; String podIp = NetworkUtils.getKubernetesPodIp(); if (StringUtils.isEmpty(podIp)) { return Optional.empty(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java index f6912bf840..97de9680a1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/util/XdsThreadLocalUtil.java @@ -18,6 +18,8 @@ import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import java.net.HttpURLConnection; + /** * xds thread local utility class * @@ -29,6 +31,10 @@ public class XdsThreadLocalUtil { private static final ThreadLocal FLOW_CONTROL_SCENARIO_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal CONNECTION_IN_CURRENT_THREAD_LOCAL = new ThreadLocal<>(); + + private static final ThreadLocal IS_CONNECTED = new ThreadLocal<>(); + private XdsThreadLocalUtil() { } @@ -57,6 +63,31 @@ public static void removeSendByteFlag() { SEND_BYTE_FLAG.remove(); } + /** + * save the instance object of HttpURLConnection + * + * @param connection the instance object of HttpURLConnection + */ + public static void saveHttpUrlConnection(HttpURLConnection connection) { + CONNECTION_IN_CURRENT_THREAD_LOCAL.set(connection); + } + + /** + * get the instance object of HttpURLConnection + * + * @return the instance object of HttpURLConnection + */ + public static HttpURLConnection getHttpUrlConnection() { + return CONNECTION_IN_CURRENT_THREAD_LOCAL.get(); + } + + /** + * remove the instance object of HttpURLConnection + */ + public static void removeHttpUrlConnection() { + CONNECTION_IN_CURRENT_THREAD_LOCAL.remove(); + } + /** * Set scenario information * @@ -81,4 +112,29 @@ public static FlowControlScenario getScenarioInfo() { public static void removeScenarioInfo() { FLOW_CONTROL_SCENARIO_THREAD_LOCAL.remove(); } + + /** + * Set the connection status of the httpUrlConnection + * + * @param executeStatus the execution status of the connect method, true: executed, false: Not executed + */ + public static void setConnectionStatus(boolean executeStatus) { + IS_CONNECTED.set(executeStatus); + } + + /** + * Is it already connected + * + * @return connection status + */ + public static boolean isConnected() { + return IS_CONNECTED.get() != null && IS_CONNECTED.get(); + } + + /** + * remove the connection status of the httpUrlConnection + */ + public static void removeConnectionStatus() { + IS_CONNECTED.remove(); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java index dc3cc8186b..c8c720890f 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/xds/circuit/XdsCircuitBreakerManager.java @@ -100,7 +100,7 @@ public static void setCircuitBeakerStatus(XdsInstanceCircuitBreakers circuitBrea XdsCircuitBreakerInfo circuitBreakerInfo = getCircuitBreakerInfo(scenarioInfo.getServiceName(), scenarioInfo.getRouteName(), scenarioInfo.getAddress()); if (!XdsThreadLocalUtil.getSendByteFlag() && circuitBreakers.isSplitExternalLocalOriginErrors() - && shouldCircuitBreakerByFailure(circuitBreakerInfo.getGateWayFailure(), + && shouldCircuitBreakerByFailure(circuitBreakerInfo.getLocalFailure(), circuitBreakers.getConsecutiveLocalOriginFailure(), circuitBreakers.getInterval())) { openCircuitBreaker(circuitBreakerInfo, circuitBreakers.getInterval()); } @@ -178,7 +178,7 @@ private static void recordRequestTime(Deque times, int failureRequestThres if (failureRequestThreshold <= 0) { return; } - for (int i = times.size(); i >= failureRequestThreshold - 1 && !times.isEmpty(); i--) { + for (int i = times.size(); i >= failureRequestThreshold && !times.isEmpty(); i--) { times.removeFirst(); } times.add(currentTime); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java index eae232f739..3509cf6cb3 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/handler/retry/RetryContextTest.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol.common.handler.retry; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.flowcontrol.common.core.rule.RetryRule; import org.junit.Assert; @@ -75,7 +76,17 @@ private void muteRetry(Object instance) { private Retry buildRetry() { return new Retry() { @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { + return false; + } + + @Override + public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) { + return false; + } + + @Override + public boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy) { return false; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java index 4d3d2ac672..fbf42d124d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/util/XdsAbstractTest.java @@ -124,7 +124,7 @@ public void setUp() { Mockito.when(xdsCoreService.getLoadBalanceService()).thenReturn(xdsLoadBalanceService); requestCircuitBreakers.setMaxRequests(1000); instanceCircuitBreakers.setInterval(1000); - retryPolicy.setRetryOn("503"); + retryPolicy.setRetryConditions(Collections.singletonList("503")); rateLimit.setResponseHeaderOption(Collections.singletonList(new XdsHeaderOption())); XdsDelay delay = new XdsDelay(); delay.setFixedDelay(1000); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java index 756ae137e2..f977a2f68a 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/test/java/io/sermant/flowcontrol/common/xds/handler/XdsHandlerTest.java @@ -63,7 +63,7 @@ public void testGetRetryPolicy() { final Optional result = XdsHandler.INSTANCE.getRetryPolicy( SERVICE_NAME, ROUTE_NAME); Assert.assertTrue(result.isPresent()); - Assert.assertEquals(retryPolicy.getRetryOn(), result.get().getRetryOn()); + Assert.assertEquals(retryPolicy.getRetryConditions().get(0), result.get().getRetryConditions().get(0)); } @Test diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java index d05e2f345f..50e67f9d27 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java @@ -132,6 +132,12 @@ public void executeWithRetryPolicy(ExecuteContext context) { if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { // execute retry logic result = handlers.get(0).executeCheckedSupplier(retryFunc::get); + context.skip(result); + return; + } + if (ex != null) { + context.setThrowableOut(getRealCause(ex)); + return; } context.skip(result); } catch (Throwable throwable) { @@ -211,10 +217,14 @@ private void handleFailedRequests(FlowControlScenario scenario, int statusCode) */ protected Optional chooseServiceInstanceForXds() { FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); - if (scenarioInfo == null || io.sermant.core.utils.StringUtils.isBlank(scenarioInfo.getServiceName()) - || io.sermant.core.utils.StringUtils.isEmpty(scenarioInfo.getClusterName())) { + if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName())) { return Optional.empty(); } + if (StringUtils.isEmpty(scenarioInfo.getClusterName())) { + scenarioInfo.setClusterName(StringUtils.EMPTY); + return Optional.ofNullable(chooseServiceInstanceByLoadBalancer( + XdsHandler.INSTANCE.getServiceInstanceByServiceName(scenarioInfo.getServiceName()), scenarioInfo)); + } Set serviceInstanceSet = XdsHandler.INSTANCE. getMatchedServiceInstance(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); if (serviceInstanceSet.isEmpty()) { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java index 3f2d03b5fb..9fdc29e1fa 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/AlibabaDubboInvokerInterceptor.java @@ -227,7 +227,7 @@ protected boolean canInvoke(ExecuteContext context) { */ public static class AlibabaDubboRetry extends AbstractRetry { @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { // dubbo does not support status codes return false; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java index 9e71558d4f..1c17dd1b36 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/ApacheDubboInvokerInterceptor.java @@ -247,7 +247,7 @@ private Object buildErrorResponse(Throwable throwable, Invocation invocation) { */ public static class ApacheDubboRetry extends AbstractRetry { @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { // dubbo does not support status codes return false; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java index 089cde0016..a9cbdab491 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java @@ -106,6 +106,7 @@ public ExecuteContext doBefore(ExecuteContext context) { if (isNeedCircuitBreak()) { context.skip(new ErrorCloseableHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, MESSAGE, httpRequest.getProtocolVersion())); + return context; } // Execute service invocation and retry logic diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java index d70e17a5d0..d539e87ca1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java @@ -19,6 +19,7 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.MapUtils; import io.sermant.core.utils.ReflectUtils; @@ -28,6 +29,7 @@ import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; import io.sermant.flowcontrol.common.util.XdsRouterUtils; import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; @@ -58,8 +60,7 @@ public class HttpUrlConnectionConnectInterceptor extends AbstractXdsHttpClientIn * Constructor */ public HttpUrlConnectionConnectInterceptor() { - super(new HttpUrlConnectionResponseStreamInterceptor.HttpUrlConnectionRetry(), - HttpUrlConnectionConnectInterceptor.class.getName()); + super(new HttpUrlConnectionRetry(), HttpUrlConnectionConnectInterceptor.class.getName()); } @Override @@ -98,6 +99,13 @@ public ExecuteContext doBefore(ExecuteContext context) { return context; } + // Save the identifier for the executed connect method, and enhance the HttpURLConnection's getInputStream only + // after the connect method has been executed. + XdsThreadLocalUtil.setConnectionStatus(true); + + // Save the HttpURLConnection to facilitate retrieving the response status code during retries + XdsThreadLocalUtil.saveHttpUrlConnection(connection); + // Execute service invocation and retry logic executeWithRetryPolicy(context); return context; @@ -210,4 +218,26 @@ private Proxy getProxy(URL newUrl) { private Proxy createProxy(URL newUrl) { return new Proxy(Proxy.Type.HTTP, InetSocketAddress.createUnresolved(newUrl.getHost(), newUrl.getPort())); } + + /** + * Http url connection retry + * + * @since 2024-12-31 + */ + public static class HttpUrlConnectionRetry extends AbstractRetry { + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + + @Override + public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) { + return false; + } + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java index f8aa44d67b..4bf752cce3 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java @@ -42,12 +42,29 @@ public HttpUrlConnectionDisconnectInterceptor() { @Override public ExecuteContext doBefore(ExecuteContext context) throws Exception { - XdsThreadLocalUtil.removeSendByteFlag(); HttpURLConnection httpUrlConnection = (HttpURLConnection) context.getObject(); context.setLocalFieldValue(STATUS_CODE, httpUrlConnection.getResponseCode()); return context; } + @Override + public ExecuteContext doAfter(ExecuteContext context) { + XdsThreadLocalUtil.removeConnectionStatus(); + XdsThreadLocalUtil.removeHttpUrlConnection(); + super.doAfter(context); + return context; + } + + @Override + public ExecuteContext doThrow(ExecuteContext context) { + if (context.getThrowableOut() != null) { + XdsThreadLocalUtil.removeConnectionStatus(); + XdsThreadLocalUtil.removeHttpUrlConnection(); + } + super.doThrow(context); + return context; + } + @Override protected int getStatusCode(ExecuteContext context) { Object statusCode = context.getLocalFieldValue(STATUS_CODE); @@ -60,4 +77,9 @@ protected int getStatusCode(ExecuteContext context) { @Override protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { } + + @Override + protected boolean canInvoke(ExecuteContext context) { + return true; + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java index 8df2054999..a1abf01103 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java @@ -19,12 +19,16 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.MapUtils; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; import io.sermant.flowcontrol.common.config.CommonConst; import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.common.xds.retry.RetryCondition; +import io.sermant.flowcontrol.common.xds.retry.RetryConditionType; import sun.net.www.http.HttpClient; import java.io.Closeable; @@ -60,12 +64,8 @@ public HttpUrlConnectionResponseStreamInterceptor() { @Override protected ExecuteContext doBefore(ExecuteContext context) throws Exception { - if (!(context.getObject() instanceof HttpURLConnection)) { - return context; - } - if (XdsThreadLocalUtil.getScenarioInfo() == null) { - return context; - } + // Remove the status to prevent multiple executions of the same request due to enhanced logic + XdsThreadLocalUtil.removeConnectionStatus(); executeWithRetryPolicy(context); return context; } @@ -91,17 +91,24 @@ protected int getStatusCode(ExecuteContext context) { } } + @Override + protected boolean canInvoke(ExecuteContext context) { + return XdsThreadLocalUtil.isConnected() && XdsThreadLocalUtil.getScenarioInfo() != null; + } + @Override protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { tryCloseOldInputStream(result); if (isFirstInvoke) { return; } + HttpURLConnection connection = (HttpURLConnection) obj; + ReflectUtils.setFieldValue(connection,"inputStream", null); + ReflectUtils.setFieldValue(connection,"cachedInputStream", null); Optional serviceInstanceOptional = chooseServiceInstanceForXds(); if (!serviceInstanceOptional.isPresent()) { return; } - HttpURLConnection connection = (HttpURLConnection) obj; ServiceInstance instance = serviceInstanceOptional.get(); resetStats(obj); URL url = connection.getURL(); @@ -172,10 +179,7 @@ private void tryCloseOldInputStream(Object rawInputStream) { public static class HttpUrlConnectionRetry extends AbstractRetry { @Override public Optional getCode(Object result) { - if (!(result instanceof HttpURLConnection)) { - return Optional.empty(); - } - HttpURLConnection connection = (HttpURLConnection) result; + HttpURLConnection connection = XdsThreadLocalUtil.getHttpUrlConnection(); try { return Optional.of(String.valueOf(connection.getResponseCode())); } catch (IOException io) { @@ -186,10 +190,7 @@ public Optional getCode(Object result) { @Override public Optional> getHeaderNames(Object result) { - if (!(result instanceof HttpURLConnection)) { - return Optional.empty(); - } - HttpURLConnection connection = (HttpURLConnection) result; + HttpURLConnection connection = XdsThreadLocalUtil.getHttpUrlConnection(); Set headerNames = new HashSet<>(); if (MapUtils.isEmpty(connection.getHeaderFields())) { return Optional.empty(); @@ -201,6 +202,35 @@ public Optional> getHeaderNames(Object result) { return Optional.of(headerNames); } + @Override + public boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy) { + List conditions = retryPolicy.getRetryConditions(); + if (CollectionUtils.isEmpty(conditions)) { + return false; + } + Optional statusCodeOptional = this.getCode(null); + if (!statusCodeOptional.isPresent()) { + return false; + } + String statusCode = statusCodeOptional.get(); + for (String conditionName : conditions) { + Optional retryConditionOptional = RetryConditionType. + getRetryConditionByName(conditionName); + if (!retryConditionOptional.isPresent()) { + continue; + } + if (retryConditionOptional.get().needRetry(null, throwable, statusCode, null)) { + return true; + } + } + return false; + } + + @Override + public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) { + return this.isNeedRetry((Throwable) null, retryPolicy); + } + @Override public Class[] retryExceptions() { return getRetryExceptions(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java index 4a9b61161b..a109557874 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java @@ -89,6 +89,7 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception { if (flowControlResult.isSkip()) { Response.Builder builder = new Response.Builder(); context.skip(builder.code(flowControlResult.getResponse().getCode()) + .protocol(Protocol.HTTP_1_1) .message(flowControlResult.buildResponseMsg()).request(request).build()); return context; } @@ -99,6 +100,7 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception { Response.Builder builder = new Response.Builder(); context.skip(builder.code(CommonConst.INTERVAL_SERVER_ERROR) .message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1).build()); + return context; } // Execute service invocation and retry logic diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java index b6543cf4c6..f82895692a 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java @@ -93,6 +93,7 @@ protected ExecuteContext doBefore(ExecuteContext context) throws Exception { Response.Builder builder = new Response.Builder(); context.skip(builder.code(CommonConst.INTERVAL_SERVER_ERROR) .message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1).build()); + return context; } // Execute service invocation and retry logic diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/AlibabaDubboClusterInvoker.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/AlibabaDubboClusterInvoker.java index bca067c05c..181bea568c 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/AlibabaDubboClusterInvoker.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/AlibabaDubboClusterInvoker.java @@ -211,7 +211,7 @@ private String getRemoteApplication(URL url, String interfaceName) { */ public static class AlibabaDubboRetry extends AbstractRetry { @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { // dubbo does not support status codes return false; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/ApacheDubboClusterInvoker.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/ApacheDubboClusterInvoker.java index 85295d8550..e9abdf7a21 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/ApacheDubboClusterInvoker.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/cluster/ApacheDubboClusterInvoker.java @@ -215,7 +215,7 @@ private String getRemoteApplication(URL url, String interfaceName) { */ public static class ApacheDubboRetry extends AbstractRetry { @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { // dubbo does not support status codes return false; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java index 9906120a0d..77db1e6599 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java @@ -18,14 +18,10 @@ package io.sermant.flowcontrol.retry.handler; import io.sermant.core.service.xds.entity.XdsRetryPolicy; -import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.ReflectUtils; -import io.sermant.core.utils.StringUtils; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; import io.sermant.flowcontrol.common.handler.retry.Retry; -import io.sermant.flowcontrol.common.xds.retry.RetryCondition; -import io.sermant.flowcontrol.common.xds.retry.RetryConditionType; import java.io.IOException; import java.net.ConnectException; @@ -73,9 +69,8 @@ public Predicate createExceptionPredicate(Class[ } @Override - public Predicate createExceptionPredicate(Class[] retryExceptions, - XdsRetryPolicy policy) { - return (Throwable ex) -> needRetry(ex, policy); + public Predicate createExceptionPredicate(Retry retry, XdsRetryPolicy policy) { + return (Throwable ex) -> retry.isNeedRetry(ex, policy); } private Predicate createExceptionPredicate(Class retryClass) { @@ -129,60 +124,11 @@ public Predicate createResultPredicate(Retry retry, RetryRule rule) { if (retryOnResponseStatus.isEmpty()) { retryOnResponseStatus.addAll(DEFAULT_RETRY_ON_RESPONSE_STATUS); } - return result -> retry.needRetry(new HashSet<>(retryOnResponseStatus), result); + return result -> retry.isNeedRetry(new HashSet<>(retryOnResponseStatus), result); } @Override public Predicate createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy) { - return result -> needRetry(retry, result, xdsRetryPolicy); - } - - private boolean needRetry(Retry retry, Object result, XdsRetryPolicy retryPolicy) { - List conditions = getRetryConditions(retryPolicy); - if (CollectionUtils.isEmpty(conditions)) { - return false; - } - Optional statusCodeOptional = retry.getCode(result); - if (!statusCodeOptional.isPresent()) { - return false; - } - String statusCode = statusCodeOptional.get(); - if (conditions.contains(statusCode)) { - return true; - } - for (String conditionName : conditions) { - Optional retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName); - if (!retryConditionOptional.isPresent()) { - continue; - } - boolean needRetry = retryConditionOptional.get().needRetry(retry, null, statusCode, result); - if (needRetry) { - return true; - } - } - return false; - } - - private boolean needRetry(Throwable ex, XdsRetryPolicy retryPolicy) { - List conditions = getRetryConditions(retryPolicy); - for (String conditionName : conditions) { - Optional retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName); - if (!retryConditionOptional.isPresent()) { - continue; - } - boolean needRetry = retryConditionOptional.get().needRetry(null, ex, null, null); - if (needRetry) { - return true; - } - } - return false; - } - - private static List getRetryConditions(XdsRetryPolicy xdsRetryPolicy) { - String retryOn = xdsRetryPolicy.getRetryOn(); - if (StringUtils.isExist(retryOn)) { - return Arrays.asList(retryOn.split(",")); - } - return Collections.emptyList(); + return result -> retry.isNeedRetry(result, xdsRetryPolicy); } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java index 270d5c01b1..912ab850eb 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java @@ -22,7 +22,7 @@ import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryRegistry; import io.sermant.core.service.xds.entity.XdsRetryPolicy; -import io.sermant.core.utils.StringUtils; +import io.sermant.core.utils.CollectionUtils; import io.sermant.flowcontrol.common.core.resolver.RetryResolver; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.entity.FlowControlScenario; @@ -55,14 +55,14 @@ public Optional createHandler(FlowControlScenario flowControlScenario, St return Optional.empty(); } XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); - if (retryPolicy.getPerTryTimeout() <= 0 || StringUtils.isEmpty(retryPolicy.getRetryOn()) + if (retryPolicy.getPerTryTimeout() <= 0 || CollectionUtils.isEmpty(retryPolicy.getRetryConditions()) || retryPolicy.getMaxAttempts() <= 0) { return Optional.empty(); } final RetryConfig retryConfig = RetryConfig.custom() .maxAttempts((int)retryPolicy.getMaxAttempts()) .retryOnResult(retryPredicateCreator.createResultPredicate(retry, retryPolicy)) - .retryOnException(retryPredicateCreator.createExceptionPredicate(retry.retryExceptions(), retryPolicy)) + .retryOnException(retryPredicateCreator.createExceptionPredicate(retry, retryPolicy)) .intervalFunction(IntervalFunction.of(retryPolicy.getPerTryTimeout())) .failAfterMaxAttempts(false) .build(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java index 97f5705921..cf2f93a8c6 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java @@ -41,11 +41,11 @@ public interface RetryPredicateCreator { /** * Create exception Predicate * - * @param retryExceptions retry exception set + * @param retry retry * @param policy retry rule * @return Predicate */ - Predicate createExceptionPredicate(Class[] retryExceptions, XdsRetryPolicy policy); + Predicate createExceptionPredicate(Retry retry, XdsRetryPolicy policy); /** * create retry result predicate diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java index d11e04a484..fd72638019 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java @@ -18,6 +18,7 @@ package io.sermant.flowcontrol.retry.handler; import feign.Response; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; @@ -58,7 +59,7 @@ public void test() { static class TestRetry implements Retry { @Override - public boolean needRetry(Set statusList, Object result) { + public boolean isNeedRetry(Set statusList, Object result) { final Optional status = ReflectUtils.invokeMethod(result, "status", null, null); if (status.isPresent()) { final Object code = status.get(); @@ -67,6 +68,16 @@ public boolean needRetry(Set statusList, Object result) { return false; } + @Override + public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) { + return false; + } + + @Override + public boolean isNeedRetry(Throwable throwable, XdsRetryPolicy retryPolicy) { + return false; + } + @Override public Class[] retryExceptions() { return new Class[0]; diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java index a4bc5f6b9e..fc8314fa82 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java @@ -16,9 +16,10 @@ package io.sermant.flowcontrol.res4j.chain.handler; +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; import io.sermant.flowcontrol.common.core.match.XdsRouteMatchManager; import io.sermant.flowcontrol.common.entity.FlowControlScenario; -import io.sermant.flowcontrol.common.entity.FlowControlServiceMeta; import io.sermant.flowcontrol.common.entity.RequestEntity; import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; @@ -32,6 +33,8 @@ * @since 2024-12-05 */ public class XdsBusinessServerRequestHandler extends AbstractChainHandler { + private final ServiceMeta serviceMeta = ConfigManager.getConfig(ServiceMeta.class); + @Override public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { if (XdsThreadLocalUtil.getScenarioInfo() != null) { @@ -39,7 +42,7 @@ public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { return; } FlowControlScenario matchedScenarioEntity = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo( - context.getRequestEntity(), FlowControlServiceMeta.getInstance().getServiceName()); + context.getRequestEntity(), serviceMeta.getService()); context.save(MATCHED_SCENARIO_NAMES, matchedScenarioEntity); XdsThreadLocalUtil.setScenarioInfo(matchedScenarioEntity); super.onBefore(context, matchedScenarioEntity); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java index 29eb8c65d5..874a72a6bc 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java @@ -115,7 +115,8 @@ protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInf if (!XDS_FLOW_CONTROL_CONFIG.isEnable()) { return true; } - return StringUtils.isEmpty(scenarioInfo.getServiceName()) || StringUtils.isEmpty(scenarioInfo.getRouteName()); + return scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getRouteName()); } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java index 6bd3bf5d0f..3b88353b77 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java @@ -91,7 +91,8 @@ protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInf if (!XDS_FLOW_CONTROL_CONFIG.isEnable()) { return true; } - return StringUtils.isEmpty(scenarioInfo.getServiceName()) || StringUtils.isEmpty(scenarioInfo.getRouteName()); + return scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getRouteName()); } @Override