From 926286e7d44955f8a10e859a1e821b76c0a90791 Mon Sep 17 00:00:00 2001 From: hanbingleixue Date: Fri, 27 Dec 2024 16:33:51 +0800 Subject: [PATCH] Add XDS flow control function Signed-off-by: hanbingleixue --- .../sermant-flowcontrol/config/config.yaml | 13 +- .../common/config/XdsFlowControlConfig.java | 4 +- .../flowcontrol-plugin/pom.xml | 21 +++ .../DispatcherServletInterceptor.java | 27 +++- .../inject/DefaultClientHttpResponse.java | 14 +- .../retry/FeignRequestInterceptor.java | 26 +++- .../retry/HttpRequestInterceptor.java | 61 +++++--- .../handler/DefaultRetryPredicateCreator.java | 65 +++++++++ .../retry/handler/RetryHandlerV2.java | 40 +++++- .../retry/handler/RetryPredicateCreator.java | 19 +++ .../service/InterceptorSupporter.java | 6 +- ...io.sermant.core.plugin.config.PluginConfig | 1 + .../SpringLbChooseServerInterceptorTest.java | 25 ++++ ...ringRibbonChooseServerInterceptorTest.java | 35 ++++- .../res4j/chain/AbstractChainHandler.java | 50 +++---- .../res4j/chain/HandlerConstants.java | 21 ++- .../res4j/chain/RequestHandler.java | 15 +- .../res4j/chain/context/ChainContext.java | 2 +- .../chain/handler/BulkheadRequestHandler.java | 17 +-- .../chain/handler/BusinessRequestHandler.java | 38 ++--- .../handler/CircuitBreakerRequestHandler.java | 46 ++++--- .../chain/handler/FaultRequestHandler.java | 12 +- .../chain/handler/FlowControlHandler.java | 9 ++ .../InstanceIsolationRequestHandler.java | 7 +- .../handler/RateLimitingRequestHandler.java | 17 +-- .../chain/handler/SystemServerReqHandler.java | 18 +-- .../XdsBusinessClientRequestHandler.java | 71 ++++++++++ .../XdsBusinessServerRequestHandler.java | 76 ++++++++++ .../chain/handler/XdsFaultRequestHandler.java | 130 ++++++++++++++++++ .../handler/XdsRateLimitRequestHandler.java | 101 ++++++++++++++ .../res4j/exceptions/RateLimitException.java | 48 +++++++ .../exception/FaultExceptionHandler.java | 6 +- .../XdsRateLimitingExceptionHandler.java | 61 ++++++++ ...owcontrol.res4j.chain.AbstractChainHandler | 4 + ...l.res4j.handler.exception.ExceptionHandler | 1 + .../res4j/chain/HandlerChainBuilderTest.java | 20 +++ .../res4j/chain/HandlerChainEntryTest.java | 9 +- .../res4j/chain/HandlerChainTest.java | 40 ++++-- .../res4j/chain/context/ChainContextTest.java | 3 + .../handler/CircuitRequestHandlerTest.java | 3 + .../chain/handler/RequestHandlerTest.java | 6 +- .../exceptions/RateLimitExceptionTest.java | 48 +++++++ .../res4j/handler/HandlerTest.java | 3 + .../service/DubboRest4jServiceImplTest.java | 3 + .../service/HttpRest4jServiceImplTest.java | 3 + 45 files changed, 1063 insertions(+), 182 deletions(-) create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitException.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/XdsRateLimitingExceptionHandler.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitExceptionTest.java diff --git a/sermant-plugins/sermant-flowcontrol/config/config.yaml b/sermant-plugins/sermant-flowcontrol/config/config.yaml index ffa2288e27..105783b1a3 100644 --- a/sermant-plugins/sermant-flowcontrol/config/config.yaml +++ b/sermant-plugins/sermant-flowcontrol/config/config.yaml @@ -11,10 +11,9 @@ flow.control.plugin: xds.flow.control.config: # Whether to enable Xds flow control enable: false - retry: - # The specified response status codes that need to be retried. Retry will be performed when the response's status - # code matches one of the specified codes. - x-sermant-retriable-status-codes: - # The specified response header names that need to be retried. Retry will be performed when the response contains - # the specified headers. - x-sermant-retriable-header-names: + # The specified response status codes that need to be retried. Retry will be performed when the response's status + # code matches one of the specified codes. + x-sermant-retriable-status-codes: + # The specified response header names that need to be retried. Retry will be performed when the response contains + # the specified headers. + x-sermant-retriable-header-names: diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java index 26e7a55f6d..cf16c0774c 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-common/src/main/java/io/sermant/flowcontrol/common/config/XdsFlowControlConfig.java @@ -34,13 +34,13 @@ public class XdsFlowControlConfig implements PluginConfig { /** * Specify the response code for retry, and retry will be executed when the response code is included */ - @ConfigFieldKey("retry.x-sermant-retriable-status-codes") + @ConfigFieldKey("x-sermant-retriable-status-codes") private List retryStatusCodes; /** * Specify the response code for retry, and retry will be executed when the response header is included */ - @ConfigFieldKey("retry.x-sermant-retriable-header-names") + @ConfigFieldKey("x-sermant-retriable-header-names") private List retryHeaderNames; /** diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml index 325e583d05..845236171d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml @@ -27,6 +27,9 @@ 1.4.7.RELEASE 2.2.0.RELEASE 31.1-jre + 4.5.13 + 4.11.0 + 2.7.5 @@ -111,6 +114,24 @@ ${google.guava} provided + + org.apache.httpcomponents + httpclient + ${apache-httpclient.version} + provided + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + provided + + + com.squareup.okhttp + okhttp + ${okhttp.sq.version} + provided + junit diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java index 93f550bf41..b2690a248b 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol; import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.LogUtils; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.common.config.ConfigConst; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -120,13 +122,21 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception return context; } chooseHttpService().onBefore(className, httpRequestEntity.get(), result); - if (result.isSkip()) { - context.skip(null); - final Object response = allArguments[1]; - if (response != null) { - setStatus.accept(response, result.getResponse().getCode()); - getWriter.apply(response).print(result.buildResponseMsg()); + if (!result.isSkip()) { + return context; + } + context.skip(null); + final Object response = allArguments[1]; + if (response == null) { + return context; + } + setStatus.accept(response, result.getResponse().getCode()); + getWriter.apply(response).print(result.buildResponseMsg()); + for (Map.Entry> entry : result.getResponse().getHeaders().entrySet()) { + if (CollectionUtils.isEmpty(entry.getValue())) { + continue; } + setResponseHeader(response, entry.getKey(), entry.getValue().get(0)); } return context; } @@ -167,6 +177,11 @@ private String getHeader(Object httpServletRequest, String key) { new Object[]{key}).orElse(null); } + private Optional setResponseHeader(Object httpServletResponse, String key, String value) { + return ReflectUtils.invokeMethod(httpServletResponse, "setHeader", + new Class[]{String.class, String.class}, new Object[]{key, value}); + } + private PrintWriter getWriter(Object httpServletRequest) { return (PrintWriter) ReflectUtils.invokeMethodWithNoneParameter(httpServletRequest, "getWriter") .orElse(null); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java index 25a1d29074..e14240e15d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java @@ -29,6 +29,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,6 +49,8 @@ public class DefaultClientHttpResponse implements ClientHttpResponse { private final String msg; + private final HttpHeaders responseHeaders; + private InputStream responseStream; /** @@ -63,6 +67,11 @@ public DefaultClientHttpResponse(FlowControlResult flowControlResult) { this.contentType = "application/text;charset=utf-8"; } this.msg = flowControlResult.buildResponseMsg(); + responseHeaders = new HttpHeaders(); + Map> headers = response.getHeaders(); + if (headers != null) { + responseHeaders.putAll(headers); + } } @Override @@ -99,8 +108,7 @@ public InputStream getBody() { @Override public HttpHeaders getHeaders() { - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.add(HttpHeaders.CONTENT_TYPE, contentType); - return httpHeaders; + responseHeaders.add(HttpHeaders.CONTENT_TYPE, contentType); + return responseHeaders; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java index 3f9a616f42..6d72cd5136 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java @@ -41,9 +41,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -209,10 +212,29 @@ public static class FeignRetry extends AbstractRetry { @Override public Optional getCode(Object result) { + Optional resultOptional = getMethodResult(result, "status"); + return resultOptional.map(String::valueOf); + } + + @Override + public Optional> getHeaderNames(Object result) { + Optional resultOptional = getMethodResult(result, "headers"); + if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) { + return Optional.empty(); + } + Map headers = (Map) resultOptional.get(); + Set headerNames = new HashSet<>(); + for (Map.Entry entry : headers.entrySet()) { + headerNames.add(entry.getKey().toString()); + } + return Optional.of(headerNames); + } + + private Optional getMethodResult(Object result, String methodName) { final Optional status = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> { final Method method; try { - method = result.getClass().getDeclaredMethod("status"); + method = result.getClass().getDeclaredMethod(methodName); method.setAccessible(true); return method; } catch (NoSuchMethodException ex) { @@ -225,7 +247,7 @@ public Optional getCode(Object result) { return Optional.empty(); } try { - return Optional.of(String.valueOf(status.get().invoke(result))); + return Optional.of(status.get().invoke(result)); } catch (IllegalAccessException ex) { LOGGER.warning(String.format(Locale.ENGLISH, "Can not find method status from class [%s]!", result.getClass().getCanonicalName())); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java index 9be3fac281..3744f541b9 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java @@ -27,6 +27,7 @@ import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import io.sermant.flowcontrol.inject.DefaultClientHttpResponse; import io.sermant.flowcontrol.inject.RetryClientHttpResponse; import io.sermant.flowcontrol.service.InterceptorSupporter; @@ -39,9 +40,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.logging.Logger; @@ -95,12 +99,12 @@ protected final ExecuteContext doBefore(ExecuteContext context) { if (flowControlResult.isSkip()) { context.skip(new DefaultClientHttpResponse(flowControlResult)); } else { - tryExeWithRetry(context); + tryExeWithRetry(context, httpRequestEntity.get()); } return context; } - private void tryExeWithRetry(ExecuteContext context) { + private void tryExeWithRetry(ExecuteContext context, HttpRequestEntity httpRequestEntity) { final Object[] allArguments = context.getArguments(); final HttpRequest request = (HttpRequest) context.getObject(); Object result; @@ -118,12 +122,8 @@ private void tryExeWithRetry(ExecuteContext context) { } context.afterMethod(result, ex); try { - final Optional httpRequestEntity = convertToHttpEntity(request); - if (!httpRequestEntity.isPresent()) { - return; - } - RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity.get()); - final List handlers = getRetryHandler().getHandlers(httpRequestEntity.get()); + RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity); + final List handlers = getRetryHandler().getHandlers(httpRequestEntity); if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { // retry only one policy request.getHeaders().add(RETRY_KEY, RETRY_VALUE); @@ -143,6 +143,7 @@ private void tryExeWithRetry(ExecuteContext context) { @Override protected ExecuteContext doThrow(ExecuteContext context) { chooseHttpService().onThrow(className, context.getThrowable()); + XdsThreadLocalUtil.removeSendByteFlag(); return context; } @@ -155,6 +156,7 @@ protected final ExecuteContext doAfter(ExecuteContext context) throws IOExceptio chooseHttpService().onThrow(className, defaultException); } chooseHttpService().onAfter(className, context.getResult()); + XdsThreadLocalUtil.removeSendByteFlag(); return context; } @@ -176,9 +178,38 @@ public static class HttpRetry extends AbstractRetry { @Override public Optional getCode(Object result) { + Optional resultOptional = getMethodResult(result, "getRawStatusCode"); + return resultOptional.map(String::valueOf); + } + + @Override + public Optional> getHeaderNames(Object result) { + Optional resultOptional = getMethodResult(result, "getHeaders"); + if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) { + return Optional.empty(); + } + Map headers = (Map) resultOptional.get(); + Set headerNames = new HashSet<>(); + for (Map.Entry entry : headers.entrySet()) { + headerNames.add(entry.getKey().toString()); + } + return Optional.of(headerNames); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING_CLOUD; + } + + private Optional getMethodResult(Object result, String methodName) { final Optional getRawStatusCode = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> { try { - final Method method = result.getClass().getDeclaredMethod("getRawStatusCode"); + final Method method = result.getClass().getDeclaredMethod(methodName); method.setAccessible(true); return method; } catch (NoSuchMethodException ex) { @@ -192,7 +223,7 @@ public Optional getCode(Object result) { return Optional.empty(); } try { - return Optional.of(String.valueOf(getRawStatusCode.get().invoke(result))); + return Optional.of(getRawStatusCode.get().invoke(result)); } catch (IllegalAccessException ex) { LOGGER.warning(String.format(Locale.ENGLISH, "Can not find method getRawStatusCode from class [%s]!", @@ -203,15 +234,5 @@ public Optional getCode(Object result) { } return Optional.empty(); } - - @Override - public Class[] retryExceptions() { - return getRetryExceptions(); - } - - @Override - public RetryFramework retryType() { - return RetryFramework.SPRING_CLOUD; - } } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java index 09375ae46c..9906120a0d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java @@ -17,10 +17,15 @@ package io.sermant.flowcontrol.retry.handler; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.ReflectUtils; +import io.sermant.core.utils.StringUtils; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; import io.sermant.flowcontrol.common.handler.retry.Retry; +import io.sermant.flowcontrol.common.xds.retry.RetryCondition; +import io.sermant.flowcontrol.common.xds.retry.RetryConditionType; import java.io.IOException; import java.net.ConnectException; @@ -67,6 +72,12 @@ public Predicate createExceptionPredicate(Class[ .orElseGet(() -> throwable -> true); } + @Override + public Predicate createExceptionPredicate(Class[] retryExceptions, + XdsRetryPolicy policy) { + return (Throwable ex) -> needRetry(ex, policy); + } + private Predicate createExceptionPredicate(Class retryClass) { return (Throwable ex) -> { if (retryClass.isAssignableFrom(getRealExceptionClass(ex))) { @@ -120,4 +131,58 @@ public Predicate createResultPredicate(Retry retry, RetryRule rule) { } return result -> retry.needRetry(new HashSet<>(retryOnResponseStatus), result); } + + @Override + public Predicate createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy) { + return result -> needRetry(retry, result, xdsRetryPolicy); + } + + private boolean needRetry(Retry retry, Object result, XdsRetryPolicy retryPolicy) { + List conditions = getRetryConditions(retryPolicy); + if (CollectionUtils.isEmpty(conditions)) { + return false; + } + Optional statusCodeOptional = retry.getCode(result); + if (!statusCodeOptional.isPresent()) { + return false; + } + String statusCode = statusCodeOptional.get(); + if (conditions.contains(statusCode)) { + return true; + } + for (String conditionName : conditions) { + Optional retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName); + if (!retryConditionOptional.isPresent()) { + continue; + } + boolean needRetry = retryConditionOptional.get().needRetry(retry, null, statusCode, result); + if (needRetry) { + return true; + } + } + return false; + } + + private boolean needRetry(Throwable ex, XdsRetryPolicy retryPolicy) { + List conditions = getRetryConditions(retryPolicy); + for (String conditionName : conditions) { + Optional retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName); + if (!retryConditionOptional.isPresent()) { + continue; + } + boolean needRetry = retryConditionOptional.get().needRetry(null, ex, null, null); + if (needRetry) { + return true; + } + } + return false; + } + + private static List getRetryConditions(XdsRetryPolicy xdsRetryPolicy) { + String retryOn = xdsRetryPolicy.getRetryOn(); + if (StringUtils.isExist(retryOn)) { + return Arrays.asList(retryOn.split(",")); + } + return Collections.emptyList(); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java index 3c4205d710..270d5c01b1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java @@ -21,10 +21,14 @@ import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryRegistry; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; +import io.sermant.core.utils.StringUtils; import io.sermant.flowcontrol.common.core.resolver.RetryResolver; import io.sermant.flowcontrol.common.core.rule.RetryRule; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.handler.AbstractRequestHandler; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; import io.sermant.flowcontrol.retry.FeignRequestInterceptor.FeignRetry; import io.sermant.flowcontrol.retry.HttpRequestInterceptor.HttpRetry; @@ -39,6 +43,32 @@ public class RetryHandlerV2 extends AbstractRequestHandler { private final RetryPredicateCreator retryPredicateCreator = new DefaultRetryPredicateCreator(); + @Override + public Optional createHandler(FlowControlScenario flowControlScenario, String businessName) { + final io.sermant.flowcontrol.common.handler.retry.Retry retry = RetryContext.INSTANCE.getRetry(); + if (retry == null) { + return Optional.empty(); + } + Optional retryPolicyOptional = XdsHandler.INSTANCE + .getRetryPolicy(flowControlScenario.getServiceName(), flowControlScenario.getRouteName()); + if (!retryPolicyOptional.isPresent()) { + return Optional.empty(); + } + XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); + if (retryPolicy.getPerTryTimeout() <= 0 || StringUtils.isEmpty(retryPolicy.getRetryOn()) + || retryPolicy.getMaxAttempts() <= 0) { + return Optional.empty(); + } + final RetryConfig retryConfig = RetryConfig.custom() + .maxAttempts((int)retryPolicy.getMaxAttempts()) + .retryOnResult(retryPredicateCreator.createResultPredicate(retry, retryPolicy)) + .retryOnException(retryPredicateCreator.createExceptionPredicate(retry.retryExceptions(), retryPolicy)) + .intervalFunction(IntervalFunction.of(retryPolicy.getPerTryTimeout())) + .failAfterMaxAttempts(false) + .build(); + return Optional.of(RetryRegistry.of(retryConfig).retry(businessName)); + } + @Override protected Optional createHandler(String businessName, RetryRule rule) { final io.sermant.flowcontrol.common.handler.retry.Retry retry = RetryContext.INSTANCE.getRetry(); @@ -46,7 +76,7 @@ protected Optional createHandler(String businessName, RetryRule rule) { return Optional.empty(); } final RetryConfig retryConfig = RetryConfig.custom() - .maxAttempts(getMaxAttempts(retry, rule)) + .maxAttempts(getMaxAttempts(retry, rule.getMaxAttempts())) .retryOnResult(retryPredicateCreator.createResultPredicate(retry, rule)) .retryOnException(retryPredicateCreator.createExceptionPredicate(retry.retryExceptions())) .intervalFunction(getIntervalFunction(rule)) @@ -61,14 +91,14 @@ protected Optional createHandler(String businessName, RetryRule rule) { * based approach is{@link FeignRetry}, others are injection * * @param retry retry type - * @param rule rule + * @param maxAttempts maximum retry * @return maximum retry */ - private int getMaxAttempts(io.sermant.flowcontrol.common.handler.retry.Retry retry, RetryRule rule) { + private int getMaxAttempts(io.sermant.flowcontrol.common.handler.retry.Retry retry, int maxAttempts) { if (retry instanceof FeignRetry || retry instanceof HttpRetry) { - return rule.getMaxAttempts(); + return maxAttempts; } - return rule.getMaxAttempts() + 1; + return maxAttempts + 1; } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java index 239641bd4b..97f5705921 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol.retry.handler; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.handler.retry.Retry; @@ -37,6 +38,15 @@ public interface RetryPredicateCreator { */ Predicate createExceptionPredicate(Class[] retryExceptions); + /** + * Create exception Predicate + * + * @param retryExceptions retry exception set + * @param policy retry rule + * @return Predicate + */ + Predicate createExceptionPredicate(Class[] retryExceptions, XdsRetryPolicy policy); + /** * create retry result predicate * @@ -45,4 +55,13 @@ public interface RetryPredicateCreator { * @return Predicate */ Predicate createResultPredicate(Retry retry, RetryRule rule); + + /** + * create retry result predicate + * + * @param retry retry + * @param xdsRetryPolicy retry rule + * @return Predicate + */ + Predicate createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java index 0f7c71ec0b..33f2418554 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java @@ -24,6 +24,7 @@ import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.plugin.service.PluginServiceManager; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.enums.FlowFramework; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; import io.sermant.flowcontrol.common.handler.retry.RetryContext; @@ -78,6 +79,8 @@ public abstract class InterceptorSupporter extends ReflectMethodCacheSupport imp */ protected final FlowControlConfig flowControlConfig; + protected final XdsFlowControlConfig xdsFlowControlConfig; + private final ReentrantLock lock = new ReentrantLock(); private RetryHandlerV2 retryHandler = null; @@ -91,6 +94,7 @@ public abstract class InterceptorSupporter extends ReflectMethodCacheSupport imp */ protected InterceptorSupporter() { flowControlConfig = PluginConfigManager.getPluginConfig(FlowControlConfig.class); + xdsFlowControlConfig = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); } /** @@ -162,7 +166,7 @@ protected final HttpService chooseHttpService() { * @return Method * @throws InvokerWrapperException InvokerWrapperException */ - protected final Supplier createRetryFunc(Object obj, Method method, Object[] allArguments, Object result) { + protected Supplier createRetryFunc(Object obj, Method method, Object[] allArguments, Object result) { return () -> { method.setAccessible(true); try { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig index c9e6fc1731..f2fc25afdd 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig @@ -1 +1,2 @@ io.sermant.flowcontrol.common.config.FlowControlConfig +io.sermant.flowcontrol.common.config.XdsFlowControlConfig diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java index f61cb7ee31..d3c9de8d4b 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java @@ -21,13 +21,19 @@ import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.plugin.agent.interceptor.Interceptor; +import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.TestHelper; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; /** * spring load test @@ -38,6 +44,20 @@ public class SpringLbChooseServerInterceptorTest { private final Object server = new Object(); + private MockedStatic pluginConfigManagerMockedStatic; + + /** + * pre initialization + */ + @Before + public void before() throws Exception { + XdsFlowControlConfig xdsFlowControlConfig = new XdsFlowControlConfig(); + xdsFlowControlConfig.setEnable(true); + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(()->PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(xdsFlowControlConfig); + } + @Test public void testBefore() throws Exception { final Interceptor interceptor = getInterceptor(); @@ -81,4 +101,9 @@ private Object getServer() { protected Interceptor getInterceptor() { return new SpringLbChooseServerInterceptor(); } + + @After + public void after() { + pluginConfigManagerMockedStatic.close(); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java index b3f805ba66..dcf1056a43 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java @@ -24,13 +24,19 @@ import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.plugin.agent.interceptor.Interceptor; +import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.TestHelper; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.List; @@ -44,6 +50,20 @@ public class SpringRibbonChooseServerInterceptorTest { private final Object server = new Object(); + private MockedStatic pluginConfigManagerMockedStatic; + + /** + * pre initialization + */ + @Before + public void before() throws Exception { + XdsFlowControlConfig xdsFlowControlConfig = new XdsFlowControlConfig(); + xdsFlowControlConfig.setEnable(true); + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(()->PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(xdsFlowControlConfig); + } + @Test public void testBefore() throws Exception { final Interceptor interceptor = getInterceptor(); @@ -74,13 +94,26 @@ public void testAfter() throws NoSuchMethodException { RetryContext.INSTANCE.buildRetryPolicy(retryRule); final ExecuteContext context = TestHelper.buildDefaultContext(); context.changeResult(Optional.of(server)); - ReflectUtils.invokeMethod(interceptor, "updateServiceInstance", new Class[]{ExecuteContext.class}, + ReflectUtils.invokeMethod(interceptor, "updateServiceInstance", new Class[]{context.getClass()}, new Object[]{context}); Assert.assertTrue(RetryContext.INSTANCE.getRetryPolicy().getAllRetriedInstance().contains(server)); + List serverList = new ArrayList<>(); + serverList.add(server); + Object instance = new Object(); + serverList.add(instance); + context.changeArgs(new Object[]{serverList}); + ReflectUtils.invokeMethod(interceptor, "removeRetriedServiceInstance", new Class[]{context.getClass()}, + new Object[]{context}); + Assert.assertEquals(((List)context.getArguments()[0]).get(0), instance); RetryContext.INSTANCE.remove(); } private Interceptor getInterceptor() { return new SpringRibbonChooseServerInterceptor(); } + + @After + public void after() { + pluginConfigManagerMockedStatic.close(); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java index ea000d4924..1162b840ad 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/AbstractChainHandler.java @@ -17,11 +17,12 @@ package io.sermant.flowcontrol.res4j.chain; +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; -import java.util.Set; - /** * abstract handler chain, the order of execution is as follows: *

onBefore -> onThrow(executed when an exception exists) -> onResult

@@ -30,38 +31,41 @@ * @since 2022-07-11 */ public abstract class AbstractChainHandler implements RequestHandler, Comparable { - private static final int EXTRA_LENGTH_FOR_METHOD_CACHE_KEY = 11; + protected static final String MATCHED_SCENARIO_NAMES = "__MATCHED_SCENARIO_ENTITY__"; + + protected static final XdsFlowControlConfig XDS_FLOW_CONTROL_CONFIG = + PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); private AbstractChainHandler next; @Override - public void onBefore(RequestContext context, Set businessNames) { - AbstractChainHandler cur = getNextHandler(context, businessNames); + public void onBefore(RequestContext context, FlowControlScenario flowControlScenario) { + AbstractChainHandler cur = getNextHandler(context, flowControlScenario); if (cur != null) { - cur.onBefore(context, businessNames); + cur.onBefore(context, flowControlScenario); } } @Override - public void onThrow(RequestContext context, Set businessNames, Throwable throwable) { - AbstractChainHandler cur = getNextHandler(context, businessNames); + public void onThrow(RequestContext context, FlowControlScenario flowControlScenario, Throwable throwable) { + AbstractChainHandler cur = getNextHandler(context, flowControlScenario); if (cur != null) { - cur.onThrow(context, businessNames, throwable); + cur.onThrow(context, flowControlScenario, throwable); } } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { - AbstractChainHandler cur = getNextHandler(context, businessNames); + public void onResult(RequestContext context, FlowControlScenario flowControlScenario, Object result) { + AbstractChainHandler cur = getNextHandler(context, flowControlScenario); if (cur != null) { - cur.onResult(context, businessNames, result); + cur.onResult(context, flowControlScenario, result); } } - private AbstractChainHandler getNextHandler(RequestContext context, Set businessNames) { + private AbstractChainHandler getNextHandler(RequestContext context, FlowControlScenario flowControlScenario) { AbstractChainHandler tmp = next; while (tmp != null) { - if (!isNeedSkip(tmp, context, businessNames)) { + if (!isNeedSkip(tmp, context, flowControlScenario)) { break; } tmp = tmp.getNext(); @@ -69,15 +73,16 @@ private AbstractChainHandler getNextHandler(RequestContext context, Set return tmp; } - private boolean isNeedSkip(AbstractChainHandler tmp, RequestContext context, Set businessNames) { + private boolean isNeedSkip(AbstractChainHandler tmp, RequestContext context, FlowControlScenario scenario) { final RequestType direct = tmp.direct(); - if (direct != RequestType.BOTH && context.getRequestEntity().getRequestType() != direct) { + if (direct != RequestType.BOTH && context.getRequestEntity() != null + && context.getRequestEntity().getRequestType() != direct) { return true; } final String skipKey = skipCacheKey(tmp); Boolean isSkip = context.get(skipKey, Boolean.class); if (isSkip == null) { - isSkip = tmp.isSkip(context, businessNames); + isSkip = tmp.isSkip(context, scenario); context.save(skipKey, isSkip); } return isSkip; @@ -88,10 +93,7 @@ private String skipCacheKey(AbstractChainHandler tmp) { RequestType direct = tmp.direct(); // The length of the String Builder is initialized for performance - StringBuilder sb = - new StringBuilder(className.length() + direct.name().length() + EXTRA_LENGTH_FOR_METHOD_CACHE_KEY); - sb.append(direct).append("_").append(className).append("_skip_flag"); - return sb.toString(); + return direct + "_" + className + "_skip_flag"; } /** @@ -108,11 +110,11 @@ protected RequestType direct() { * whether to skip the current handler * * @param context requestContext - * @param businessNames matching scene name + * @param flowControlScenario matched scenario information * @return skip or not */ - protected boolean isSkip(RequestContext context, Set businessNames) { - return false; + protected boolean isSkip(RequestContext context, FlowControlScenario flowControlScenario) { + return XDS_FLOW_CONTROL_CONFIG.isEnable(); } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerConstants.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerConstants.java index 52fac80c09..adfddc7798 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerConstants.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/HandlerConstants.java @@ -34,20 +34,35 @@ public class HandlerConstants { */ public static final int BUSINESS_ORDER = -1000; + /** + * xds service processing priority + */ + public static final int XDS_BUSINESS_ORDER = -1001; + /** * fault injection priority */ - public static final int FAULT_ORDER = 3000; + public static final int FAULT_ORDER = 4000; + + /** + * xds fault injection priority + */ + public static final int XDS_FAULT_ORDER = 3000; + + /** + * rate limiting priority + */ + public static final int RATE_LIMIT_ORDER = 6000; /** * rate limiting priority */ - public static final int RATE_LIMIT_ORDER = 4000; + public static final int XDS_RATE_LIMIT_ORDER = 5000; /** * isolation bin priority */ - public static final int BULK_HEAD_ORDER = 5000; + public static final int BULK_HEAD_ORDER = 7000; /** * Instance isolation priority, which must be greater than the circuit breaker priority diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/RequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/RequestHandler.java index 67bb18ad3f..6467030be1 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/RequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/RequestHandler.java @@ -17,10 +17,9 @@ package io.sermant.flowcontrol.res4j.chain; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; -import java.util.Set; - /** * request handler definition * @@ -32,27 +31,27 @@ public interface RequestHandler { * request processing * * @param context request context - * @param businessNames matched service scenario name + * @param flowControlScenario matched business information */ - void onBefore(RequestContext context, Set businessNames); + void onBefore(RequestContext context, FlowControlScenario flowControlScenario); /** * response processing * * @param context request context - * @param businessNames matched service scenario name + * @param flowControlScenario matched business information * @param result response result */ - void onResult(RequestContext context, Set businessNames, Object result); + void onResult(RequestContext context, FlowControlScenario flowControlScenario, Object result); /** * response processing * * @param context request context - * @param businessNames matched service scenario name + * @param flowControlScenario matched business information * @param throwable throwable */ - void onThrow(RequestContext context, Set businessNames, Throwable throwable); + void onThrow(RequestContext context, FlowControlScenario flowControlScenario, Throwable throwable); /** * priority diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/context/ChainContext.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/context/ChainContext.java index b8f2368f26..d568520191 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/context/ChainContext.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/context/ChainContext.java @@ -32,7 +32,7 @@ public class ChainContext { private static final ThreadLocal> THREAD_LOCAL_CONTEXT_MAP = new ThreadLocal<>(); - private static final int MAX_SIZE = 8; + private static final int MAX_SIZE = 11; private ChainContext() { } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BulkheadRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BulkheadRequestHandler.java index 5ab07a0316..78ed710f9a 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BulkheadRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BulkheadRequestHandler.java @@ -19,13 +19,13 @@ import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadFullException; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.ChainContext; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import io.sermant.flowcontrol.res4j.handler.BulkheadHandler; import java.util.List; -import java.util.Set; /** * isolation bin treatment @@ -37,22 +37,23 @@ public class BulkheadRequestHandler extends FlowControlHandler { private final BulkheadHandler bulkheadHandler = new BulkheadHandler(); @Override - public void onBefore(RequestContext context, Set businessNames) { - final List handlers = bulkheadHandler.createOrGetHandlers(businessNames); + public void onBefore(RequestContext context, FlowControlScenario flowControlScenario) { + final List handlers = + bulkheadHandler.createOrGetHandlers(flowControlScenario.getMatchedScenarioNames()); if (!handlers.isEmpty()) { context.save(getContextName(), handlers); handlers.forEach(Bulkhead::acquirePermission); } - super.onBefore(context, businessNames); + super.onBefore(context, flowControlScenario); } @Override - public void onThrow(RequestContext context, Set businessNames, Throwable throwable) { - super.onThrow(context, businessNames, throwable); + public void onThrow(RequestContext context, FlowControlScenario flowControlScenario, Throwable throwable) { + super.onThrow(context, flowControlScenario, throwable); } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { + public void onResult(RequestContext context, FlowControlScenario flowControlScenario, Object result) { try { final List bulkheads = getHandlersFromCache(context.getSourceName(), getContextName()); if (bulkheads != null && !isOccurBulkheadLimit(context.getSourceName())) { @@ -61,7 +62,7 @@ public void onResult(RequestContext context, Set businessNames, Object r } finally { context.remove(getContextName()); } - super.onResult(context, businessNames, result); + super.onResult(context, flowControlScenario, result); } /** diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java index 9dbba3368c..60a86a5475 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/BusinessRequestHandler.java @@ -18,6 +18,7 @@ package io.sermant.flowcontrol.res4j.chain.handler; import io.sermant.flowcontrol.common.core.match.MatchManager; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.ChainContext; @@ -32,44 +33,33 @@ * @since 2022-07-05 */ public class BusinessRequestHandler extends AbstractChainHandler { - private static final String MATCHED_BUSINESS_NAMES = "__MATCHED_BUSINESS_NAMES__"; - @Override - public void onBefore(RequestContext context, Set businessNames) { + public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { final Set matchBusinessNames = MatchManager.INSTANCE.matchWithCache(context.getRequestEntity()); - if (matchBusinessNames == null || matchBusinessNames.isEmpty()) { - return; + if (scenarioInfo != null) { + scenarioInfo.setMatchedScenarioNames(matchBusinessNames); + super.onBefore(context, scenarioInfo); + } else { + FlowControlScenario flowControlScenario = new FlowControlScenario(); + flowControlScenario.setMatchedScenarioNames(matchBusinessNames); + super.onBefore(context, flowControlScenario); } - context.save(MATCHED_BUSINESS_NAMES, matchBusinessNames); - super.onBefore(context, matchBusinessNames); } @Override - public void onThrow(RequestContext context, Set businessNames, Throwable throwable) { - final Set matchBusinessNames = getMatchedBusinessNames(context); - if (matchBusinessNames == null || matchBusinessNames.isEmpty()) { - return; - } - super.onThrow(context, matchBusinessNames, throwable); + public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { + super.onThrow(context, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { - final Set matchBusinessNames = getMatchedBusinessNames(context); - if (matchBusinessNames == null || matchBusinessNames.isEmpty()) { - return; - } + public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { try { - super.onResult(context, matchBusinessNames, result); + super.onResult(context, scenarioInfo, result); } finally { - ChainContext.getThreadLocalContext(context.getSourceName()).remove(MATCHED_BUSINESS_NAMES); + ChainContext.getThreadLocalContext(context.getSourceName()).remove(MATCHED_SCENARIO_NAMES); } } - private Set getMatchedBusinessNames(RequestContext context) { - return context.get(MATCHED_BUSINESS_NAMES, Set.class); - } - @Override public int getOrder() { return HandlerConstants.BUSINESS_ORDER; diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitBreakerRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitBreakerRequestHandler.java index 3289ef3396..d9c7a56fdd 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitBreakerRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitBreakerRequestHandler.java @@ -18,6 +18,8 @@ package io.sermant.flowcontrol.res4j.chain.handler; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.adaptor.CircuitBreakerAdaptor; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; @@ -25,7 +27,6 @@ import io.sermant.flowcontrol.res4j.handler.CircuitBreakerHandler; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -42,19 +43,21 @@ public class CircuitBreakerRequestHandler extends FlowControlHandler businessNames) { - final List circuitBreakers = circuitBreakerHandler.createOrGetHandlers(businessNames); + public void onBefore(RequestContext context, FlowControlScenario flowControlScenario) { + final List circuitBreakers = + circuitBreakerHandler.createOrGetHandlers(flowControlScenario.getMatchedScenarioNames()); if (!circuitBreakers.isEmpty()) { for (CircuitBreaker circuitBreaker : circuitBreakers) { checkForceOpen(circuitBreaker); checkCircuitBreakerState(circuitBreaker); } - // 这里使用内置方法获取时间, 列表中的每个熔断器时间均一致,因此取第一个 + // Use the built-in method to get the time. Since the time of each circuit breaker in the list is the same, + // take the first one context.save(getStartTime(), circuitBreakers.get(0).getCurrentTimestamp()); context.save(getContextName(), circuitBreakers); } - super.onBefore(context, businessNames); + super.onBefore(context, flowControlScenario); } private void checkCircuitBreakerState(CircuitBreaker circuitBreaker) { @@ -65,34 +68,34 @@ private void checkCircuitBreakerState(CircuitBreaker circuitBreaker) { } /** - * 强制开启状态直接抛出熔断异常 + * In forced open state, throw a circuit breaker exception directly * - * @param circuitBreaker 熔断器 + * @param circuitBreaker Circuit Breaker */ private void checkForceOpen(CircuitBreaker circuitBreaker) { if (circuitBreaker instanceof CircuitBreakerAdaptor) { if (((CircuitBreakerAdaptor) circuitBreaker).isForceOpen()) { - // 强制开启则直接抛出异常 + // Force open to throw an exception directly throw CircuitBreakerException.createException(circuitBreaker); } } } @Override - public void onThrow(RequestContext context, Set businessNames, Throwable throwable) { + public void onThrow(RequestContext context, FlowControlScenario scenario, Throwable throwable) { process(context, throwable, null, false); - super.onThrow(context, businessNames, throwable); + super.onThrow(context, scenario, throwable); } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { + public void onResult(RequestContext context, FlowControlScenario scenario, Object result) { try { process(context, null, result, true); } finally { context.remove(getContextName()); context.remove(getStartTime()); } - super.onResult(context, businessNames, result); + super.onResult(context, scenario, result); } private void process(RequestContext context, Throwable throwable, Object result, boolean isResult) { @@ -112,8 +115,9 @@ private void process(RequestContext context, Throwable throwable, Object result, } @Override - protected boolean isSkip(RequestContext context, Set businessNames) { - return isForceClose(circuitBreakerHandler.createOrGetHandlers(businessNames)); + protected boolean isSkip(RequestContext context, FlowControlScenario scenario) { + return scenario == null || CollectionUtils.isEmpty(scenario.getMatchedScenarioNames()) + || isForceClose(circuitBreakerHandler.createOrGetHandlers(scenario.getMatchedScenarioNames())); } private boolean isForceClose(List circuitBreakers) { @@ -122,7 +126,7 @@ private boolean isForceClose(List circuitBreakers) { continue; } if (((CircuitBreakerAdaptor) circuitBreaker).isForceClosed()) { - // 强制关闭则跳过当前处理器逻辑 + // Force shutdown skips current processor logic return true; } } @@ -130,9 +134,9 @@ private boolean isForceClose(List circuitBreakers) { } /** - * 获取当前缓存上下文名称 + * Get the name of the current cache context * - * @return 缓存上下文名称 + * @return the name of the current cache context */ @Override protected String getContextName() { @@ -140,9 +144,9 @@ protected String getContextName() { } /** - * 获取当前缓存上下文开始时间 + * Get the start time of the current cache context * - * @return 缓存上下文开始时间 + * @return the start time of the current cache context */ protected String getStartTime() { return START_TIME; @@ -154,9 +158,9 @@ public int getOrder() { } /** - * 获取控制器 + * Get the controller * - * @return 控制器 + * @return controller */ protected CircuitBreakerHandler getHandler() { return new CircuitBreakerHandler(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FaultRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FaultRequestHandler.java index 9940a0f47b..9d1508d493 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FaultRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FaultRequestHandler.java @@ -18,13 +18,13 @@ package io.sermant.flowcontrol.res4j.chain.handler; import io.sermant.flowcontrol.common.core.rule.fault.Fault; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import io.sermant.flowcontrol.res4j.handler.FaultHandler; import java.util.List; -import java.util.Set; /** * error injection request handler @@ -38,19 +38,19 @@ public class FaultRequestHandler extends FlowControlHandler { private final String contextName = FaultRequestHandler.class.getName(); @Override - public void onBefore(RequestContext context, Set businessNames) { - final List faults = faultHandler.createOrGetHandlers(businessNames); + public void onBefore(RequestContext context, FlowControlScenario flowControlScenario) { + final List faults = faultHandler.createOrGetHandlers(flowControlScenario.getMatchedScenarioNames()); if (!faults.isEmpty()) { faults.forEach(Fault::acquirePermission); context.save(getContextName(), faults); } - super.onBefore(context, businessNames); + super.onBefore(context, flowControlScenario); } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { + public void onResult(RequestContext context, FlowControlScenario flowControlScenario, Object result) { context.remove(getContextName()); - super.onResult(context, businessNames, result); + super.onResult(context, flowControlScenario, result); } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FlowControlHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FlowControlHandler.java index c4446e84a3..4ae4dd7c81 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FlowControlHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/FlowControlHandler.java @@ -17,8 +17,11 @@ package io.sermant.flowcontrol.res4j.chain.handler; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; import io.sermant.flowcontrol.res4j.chain.context.ChainContext; +import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import java.util.List; @@ -51,4 +54,10 @@ protected List getHandlersFromCache(String sourceName, String cacheName) { protected String getContextName() { return getClass().getName(); } + + @Override + protected boolean isSkip(RequestContext context, FlowControlScenario flowControlScenario) { + return XDS_FLOW_CONTROL_CONFIG.isEnable() || flowControlScenario == null + || CollectionUtils.isEmpty(flowControlScenario.getMatchedScenarioNames()); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/InstanceIsolationRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/InstanceIsolationRequestHandler.java index fbc4bcd524..7de68c439b 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/InstanceIsolationRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/InstanceIsolationRequestHandler.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol.res4j.chain.handler; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; @@ -25,8 +26,6 @@ import io.sermant.flowcontrol.res4j.handler.CircuitBreakerHandler; import io.sermant.flowcontrol.res4j.handler.InstanceIsolationHandler; -import java.util.Set; - /** * Instance isolation. Instance isolation applies only to clients * @@ -39,9 +38,9 @@ public class InstanceIsolationRequestHandler extends CircuitBreakerRequestHandle private static final String START_TIME = CONTEXT_NAME + "_START_TIME"; @Override - public void onBefore(RequestContext context, Set businessNames) { + public void onBefore(RequestContext context, FlowControlScenario flowControlScenario) { try { - super.onBefore(context, businessNames); + super.onBefore(context, flowControlScenario); } catch (CircuitBreakerException ex) { throw InstanceIsolationException.createException(ex.getCircuitBreaker()); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/RateLimitingRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/RateLimitingRequestHandler.java index 54cd91698b..d7b18a1c65 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/RateLimitingRequestHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/RateLimitingRequestHandler.java @@ -18,12 +18,12 @@ package io.sermant.flowcontrol.res4j.chain.handler; import io.github.resilience4j.ratelimiter.RateLimiter; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; import io.sermant.flowcontrol.res4j.handler.RateLimitingHandler; import java.util.List; -import java.util.Set; /** * Current limiting request handler @@ -35,26 +35,27 @@ public class RateLimitingRequestHandler extends FlowControlHandler private final RateLimitingHandler rateLimitingHandler = new RateLimitingHandler(); @Override - public void onBefore(RequestContext context, Set businessNames) { - final List handlers = rateLimitingHandler.createOrGetHandlers(businessNames); + public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { + final List handlers = + rateLimitingHandler.createOrGetHandlers(scenarioInfo.getMatchedScenarioNames()); if (!handlers.isEmpty()) { context.save(getContextName(), handlers); handlers.forEach(rateLimiter -> RateLimiter.waitForPermission(rateLimiter, 1)); } - super.onBefore(context, businessNames); + super.onBefore(context, scenarioInfo); } @Override - public void onThrow(RequestContext context, Set businessNames, Throwable throwable) { + public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { final List rateLimiters = getHandlersFromCache(context.getSourceName(), getContextName()); if (rateLimiters != null) { rateLimiters.forEach(rateLimiter -> rateLimiter.onError(throwable)); } - super.onThrow(context, businessNames, throwable); + super.onThrow(context, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { + public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { try { final List rateLimiters = getHandlersFromCache(context.getSourceName(), getContextName()); if (rateLimiters != null) { @@ -63,7 +64,7 @@ public void onResult(RequestContext context, Set businessNames, Object r } finally { context.remove(getContextName()); } - super.onResult(context, businessNames, result); + super.onResult(context, scenarioInfo, result); } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/SystemServerReqHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/SystemServerReqHandler.java index 7912e6b2db..59e0397b29 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/SystemServerReqHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/SystemServerReqHandler.java @@ -19,6 +19,7 @@ import io.sermant.flowcontrol.common.config.CommonConst; import io.sermant.flowcontrol.common.core.rule.fault.Fault; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.res4j.chain.HandlerConstants; import io.sermant.flowcontrol.res4j.chain.context.RequestContext; @@ -28,7 +29,6 @@ import io.sermant.flowcontrol.res4j.windows.WindowsArray; import java.util.List; -import java.util.Set; /** * system rule flow control Handler @@ -42,11 +42,11 @@ public class SystemServerReqHandler extends FlowControlHandler { private final String contextName = SystemServerReqHandler.class.getName(); @Override - public void onBefore(RequestContext context, Set businessNames) { + public void onBefore(RequestContext context, FlowControlScenario businessEntity) { if (SystemRuleUtils.isEnableSystemRule()) { - // flow control detection - final List faults = systemRuleHandler.createOrGetHandlers(businessNames); + final List faults = + systemRuleHandler.createOrGetHandlers(businessEntity.getMatchedScenarioNames()); if (!faults.isEmpty()) { faults.forEach(Fault::acquirePermission); } @@ -55,18 +55,18 @@ public void onBefore(RequestContext context, Set businessNames) { context.save(CommonConst.REQUEST_START_TIME, System.currentTimeMillis()); WindowsArray.INSTANCE.addThreadNum(context.get(CommonConst.REQUEST_START_TIME, long.class)); } - super.onBefore(context, businessNames); + super.onBefore(context, businessEntity); } @Override - public void onThrow(RequestContext context, Set businessNames, Throwable throwable) { + public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { context.remove(getContextName()); context.remove(CommonConst.REQUEST_START_TIME); - super.onThrow(context, businessNames, throwable); + super.onThrow(context, scenarioInfo, throwable); } @Override - public void onResult(RequestContext context, Set businessNames, Object result) { + public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { if (SystemRuleUtils.isEnableSystemRule() && context.hasKey(CommonConst.REQUEST_START_TIME)) { long startTime = context.get(CommonConst.REQUEST_START_TIME, long.class); WindowsArray.INSTANCE.addSuccess(startTime); @@ -75,7 +75,7 @@ public void onResult(RequestContext context, Set businessNames, Object r context.remove(CommonConst.REQUEST_START_TIME); } context.remove(getContextName()); - super.onResult(context, businessNames, result); + super.onResult(context, scenarioInfo, result); } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java new file mode 100644 index 0000000000..0938bff1cd --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessClientRequestHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain.handler; + +import io.sermant.flowcontrol.common.core.match.XdsRouteMatchManager; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.HandlerConstants; +import io.sermant.flowcontrol.res4j.chain.context.RequestContext; + +/** + * Business handler class for client requests, Get the matched scenario information based on the routing match rule + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsBusinessClientRequestHandler extends AbstractChainHandler { + @Override + public void onBefore(RequestContext context, FlowControlScenario scenario) { + FlowControlScenario matchedScenario = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo( + context.getRequestEntity(), context.getRequestEntity().getServiceName()); + context.save(MATCHED_SCENARIO_NAMES, matchedScenario); + XdsThreadLocalUtil.setScenarioInfo(matchedScenario); + super.onBefore(context, matchedScenario); + } + + @Override + public void onThrow(RequestContext context, FlowControlScenario scenario, Throwable throwable) { + super.onThrow(context, scenario, throwable); + } + + @Override + public void onResult(RequestContext context, FlowControlScenario scenario, Object result) { + try { + super.onResult(context, scenario, result); + } finally { + XdsThreadLocalUtil.removeScenarioInfo(); + } + } + + @Override + public int getOrder() { + return HandlerConstants.XDS_BUSINESS_ORDER; + } + + @Override + protected boolean isSkip(RequestContext context, FlowControlScenario scenario) { + return !XDS_FLOW_CONTROL_CONFIG.isEnable(); + } + + @Override + protected RequestEntity.RequestType direct() { + return RequestEntity.RequestType.CLIENT; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java new file mode 100644 index 0000000000..a4bc5f6b9e --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsBusinessServerRequestHandler.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain.handler; + +import io.sermant.flowcontrol.common.core.match.XdsRouteMatchManager; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.FlowControlServiceMeta; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.HandlerConstants; +import io.sermant.flowcontrol.res4j.chain.context.RequestContext; + +/** + * Business handler class for server requests, Get the matched scenario information based on the routing match rule + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsBusinessServerRequestHandler extends AbstractChainHandler { + @Override + public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { + if (XdsThreadLocalUtil.getScenarioInfo() != null) { + super.onBefore(context, XdsThreadLocalUtil.getScenarioInfo()); + return; + } + FlowControlScenario matchedScenarioEntity = XdsRouteMatchManager.INSTANCE.getMatchedScenarioInfo( + context.getRequestEntity(), FlowControlServiceMeta.getInstance().getServiceName()); + context.save(MATCHED_SCENARIO_NAMES, matchedScenarioEntity); + XdsThreadLocalUtil.setScenarioInfo(matchedScenarioEntity); + super.onBefore(context, matchedScenarioEntity); + } + + @Override + public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { + super.onThrow(context, scenarioInfo, throwable); + } + + @Override + public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { + try { + super.onResult(context, scenarioInfo, result); + } finally { + XdsThreadLocalUtil.removeScenarioInfo(); + } + } + + @Override + public int getOrder() { + return HandlerConstants.XDS_BUSINESS_ORDER; + } + + @Override + protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInfo) { + return !XDS_FLOW_CONTROL_CONFIG.isEnable(); + } + + @Override + protected RequestEntity.RequestType direct() { + return RequestEntity.RequestType.SERVER; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java new file mode 100644 index 0000000000..29eb8c65d5 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsFaultRequestHandler.java @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain.handler; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.xds.entity.FractionalPercent; +import io.sermant.core.service.xds.entity.XdsAbort; +import io.sermant.core.service.xds.entity.XdsDelay; +import io.sermant.core.service.xds.entity.XdsHttpFault; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.core.rule.fault.FaultException; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.RandomUtil; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; +import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.HandlerConstants; +import io.sermant.flowcontrol.res4j.chain.context.RequestContext; + +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Current limiting request handler + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsFaultRequestHandler extends AbstractChainHandler { + private static final String MESSAGE = "Request has been aborted by fault-ThrowException"; + + private static final Logger LOGGER = LoggerFactory.getLogger(); + + @Override + public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { + Optional xdsHttpFaultOptional = XdsHandler.INSTANCE. + getHttpFault(scenarioInfo.getServiceName(), scenarioInfo.getRouteName()); + if (!xdsHttpFaultOptional.isPresent()) { + super.onBefore(context, scenarioInfo); + return; + } + XdsHttpFault xdsHttpFault = xdsHttpFaultOptional.get(); + if (xdsHttpFault.getAbort() == null && xdsHttpFault.getDelay() == null) { + super.onBefore(context, scenarioInfo); + return; + } + executeAbort(xdsHttpFault.getAbort()); + executeDelay(xdsHttpFault.getDelay()); + super.onBefore(context, scenarioInfo); + } + + private void executeAbort(XdsAbort xdsAbort) { + if (xdsAbort.getPercentage() == null) { + return; + } + FractionalPercent fractionalPercent = xdsAbort.getPercentage(); + if (fractionalPercent.getNumerator() <= 0 || fractionalPercent.getDenominator() <= 0) { + return; + } + int randomNum = RandomUtil.randomInt(fractionalPercent.getDenominator()); + if (randomNum < fractionalPercent.getNumerator()) { + int status = xdsAbort.getHttpStatus() > 0 ? xdsAbort.getHttpStatus() : CommonConst.INTERVAL_SERVER_ERROR; + throw new FaultException(status, MESSAGE, null); + } + } + + private void executeDelay(XdsDelay delay) { + if (delay.getFixedDelay() <= 0 || delay.getPercentage() == null) { + return; + } + FractionalPercent fractionalPercent = delay.getPercentage(); + if (fractionalPercent.getNumerator() <= 0 || fractionalPercent.getDenominator() <= 0) { + return; + } + int randomNum = RandomUtil.randomInt(fractionalPercent.getDenominator()); + if (randomNum >= fractionalPercent.getNumerator()) { + return; + } + LOGGER.log(Level.FINE, "Start delay request by delay fault, delay time is {0}ms", delay.getFixedDelay()); + try { + Thread.sleep(delay.getFixedDelay()); + } catch (InterruptedException ignored) { + // ignored + } + } + + @Override + public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { + super.onThrow(context, scenarioInfo, throwable); + } + + @Override + public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { + super.onResult(context, scenarioInfo, result); + } + + @Override + protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInfo) { + if (!XDS_FLOW_CONTROL_CONFIG.isEnable()) { + return true; + } + return StringUtils.isEmpty(scenarioInfo.getServiceName()) || StringUtils.isEmpty(scenarioInfo.getRouteName()); + } + + @Override + protected RequestEntity.RequestType direct() { + return RequestEntity.RequestType.CLIENT; + } + + @Override + public int getOrder() { + return HandlerConstants.XDS_FAULT_ORDER; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java new file mode 100644 index 0000000000..6bd3bf5d0f --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/chain/handler/XdsRateLimitRequestHandler.java @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.chain.handler; + +import io.sermant.core.service.xds.entity.FractionalPercent; +import io.sermant.core.service.xds.entity.XdsRateLimit; +import io.sermant.core.service.xds.entity.XdsTokenBucket; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.RandomUtil; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; +import io.sermant.flowcontrol.common.xds.ratelimit.XdsRateLimitManager; +import io.sermant.flowcontrol.res4j.chain.AbstractChainHandler; +import io.sermant.flowcontrol.res4j.chain.HandlerConstants; +import io.sermant.flowcontrol.res4j.chain.context.RequestContext; +import io.sermant.flowcontrol.res4j.exceptions.RateLimitException; + +import java.util.Optional; + +/** + * Current limiting request handler + * + * @author zhp + * @since 2024-12-05 + */ +public class XdsRateLimitRequestHandler extends AbstractChainHandler { + @Override + public void onBefore(RequestContext context, FlowControlScenario scenarioInfo) { + handleRateLimit(scenarioInfo); + super.onBefore(context, scenarioInfo); + } + + private void handleRateLimit(FlowControlScenario scenarioInfo) { + Optional xdsRateLimitOptional = XdsHandler.INSTANCE.getRateLimit( + scenarioInfo.getServiceName(), scenarioInfo.getRouteName(), scenarioInfo.getClusterName()); + if (!xdsRateLimitOptional.isPresent()) { + return; + } + XdsRateLimit xdsRateLimit = xdsRateLimitOptional.get(); + XdsTokenBucket tokenBucket = xdsRateLimit.getTokenBucket(); + if (tokenBucket == null || xdsRateLimit.getPercent() == null + || tokenBucket.getMaxTokens() <= 0 || tokenBucket.getFillInterval() <= 0) { + return; + } + FractionalPercent fractionalPercent = xdsRateLimit.getPercent(); + if (fractionalPercent.getNumerator() <= 0 || fractionalPercent.getDenominator() <= 0) { + return; + } + int randomNum = RandomUtil.randomInt(fractionalPercent.getDenominator()); + if (randomNum >= fractionalPercent.getNumerator()) { + return; + } + if (!XdsRateLimitManager.fillAndConsumeToken(scenarioInfo.getServiceName(), scenarioInfo.getRouteName(), + tokenBucket)) { + throw new RateLimitException(xdsRateLimit.getResponseHeaderOption()); + } + } + + @Override + public void onThrow(RequestContext context, FlowControlScenario scenarioInfo, Throwable throwable) { + super.onThrow(context, scenarioInfo, throwable); + } + + @Override + public void onResult(RequestContext context, FlowControlScenario scenarioInfo, Object result) { + super.onResult(context, scenarioInfo, result); + } + + @Override + public int getOrder() { + return HandlerConstants.XDS_RATE_LIMIT_ORDER; + } + + @Override + protected boolean isSkip(RequestContext context, FlowControlScenario scenarioInfo) { + if (!XDS_FLOW_CONTROL_CONFIG.isEnable()) { + return true; + } + return StringUtils.isEmpty(scenarioInfo.getServiceName()) || StringUtils.isEmpty(scenarioInfo.getRouteName()); + } + + @Override + protected RequestEntity.RequestType direct() { + return RequestEntity.RequestType.SERVER; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitException.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitException.java new file mode 100644 index 0000000000..fe1f62cec6 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitException.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.exceptions; + +import io.sermant.core.service.xds.entity.XdsHeaderOption; + +import java.util.List; + +/** + * rate limit injection exception + * + * @author zhp + * @since 2024-12-05 + */ +public class RateLimitException extends RuntimeException { + private List xdsHeaderOptions; + + /** + * Constructor + * + * @param xdsHeaderOptions Header name/value pair plus option + */ + public RateLimitException(List xdsHeaderOptions) { + this.xdsHeaderOptions = xdsHeaderOptions; + } + + public List getXdsHeaderOptions() { + return xdsHeaderOptions; + } + + public void setXdsHeaderOptions(List xdsHeaderOptions) { + this.xdsHeaderOptions = xdsHeaderOptions; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/FaultExceptionHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/FaultExceptionHandler.java index d34bd9f482..02812b3d49 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/FaultExceptionHandler.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/FaultExceptionHandler.java @@ -32,9 +32,11 @@ */ public class FaultExceptionHandler extends AbstractExceptionHandler { @Override - protected FlowControlResponse getFlowControlResponse(FaultException ex, - FlowControlResult flowControlResult) { + protected FlowControlResponse getFlowControlResponse(FaultException ex, FlowControlResult flowControlResult) { final FaultRule rule = ex.getRule(); + if (rule == null) { + return new FlowControlResponse(ex.getMsg(), ex.getCode()); + } if (RuleConstants.FAULT_RULE_FALLBACK_NULL_TYPE.equals(rule.getFallbackType())) { return new FlowControlResponse(ex.getMsg(), CommonConst.HTTP_OK, null); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/XdsRateLimitingExceptionHandler.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/XdsRateLimitingExceptionHandler.java new file mode 100644 index 0000000000..c2f8db2dcd --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/java/io/sermant/flowcontrol/res4j/handler/exception/XdsRateLimitingExceptionHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.handler.exception; + +import io.sermant.core.service.xds.entity.XdsHeader; +import io.sermant.core.service.xds.entity.XdsHeaderOption; +import io.sermant.core.utils.StringUtils; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResponse; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.res4j.exceptions.RateLimitException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * flow control exception handler + * + * @author zhouss + * @since 2024-12-05 + */ +public class XdsRateLimitingExceptionHandler extends AbstractExceptionHandler { + @Override + protected FlowControlResponse getFlowControlResponse(RateLimitException ex, FlowControlResult flowControlResult) { + Map> headers = new HashMap<>(); + for (XdsHeaderOption xdsHeaderOption : ex.getXdsHeaderOptions()) { + XdsHeader header = xdsHeaderOption.getHeader(); + if (header == null || StringUtils.isEmpty(header.getKey())) { + continue; + } + if (xdsHeaderOption.isEnabledAppend() && headers.containsKey(header.getKey())) { + List value = headers.get(header.getKey()); + value.set(0, value.get(0) + "," + header.getValue()); + continue; + } + headers.put(header.getKey(), Collections.singletonList(header.getValue())); + } + return new FlowControlResponse("Rate Limited", CommonConst.TOO_MANY_REQUEST_CODE, headers, null); + } + + @Override + public Class targetException() { + return RateLimitException.class; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler index 5d38258705..3ea32db77d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.chain.AbstractChainHandler @@ -24,3 +24,7 @@ io.sermant.flowcontrol.res4j.chain.handler.CircuitBreakerServerReqHandler io.sermant.flowcontrol.res4j.chain.handler.InstanceIsolationRequestHandler io.sermant.flowcontrol.res4j.chain.handler.FaultRequestHandler io.sermant.flowcontrol.res4j.chain.handler.SystemServerReqHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsRateLimitRequestHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsBusinessClientRequestHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsBusinessServerRequestHandler +io.sermant.flowcontrol.res4j.chain.handler.XdsFaultRequestHandler diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.handler.exception.ExceptionHandler b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.handler.exception.ExceptionHandler index c2d891b4c5..348d2b7ea9 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.handler.exception.ExceptionHandler +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/main/resources/META-INF/services/io.sermant.flowcontrol.res4j.handler.exception.ExceptionHandler @@ -21,3 +21,4 @@ io.sermant.flowcontrol.res4j.handler.exception.CircuitExceptionHandler io.sermant.flowcontrol.res4j.handler.exception.InstanceIsolationExceptionHandler io.sermant.flowcontrol.res4j.handler.exception.RateLimitingExceptionHandler io.sermant.flowcontrol.res4j.handler.exception.SystemRuleExceptionHandler +io.sermant.flowcontrol.res4j.handler.exception.XdsRateLimitingExceptionHandler diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilderTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilderTest.java index 4989bca191..8ed1a6a3be 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilderTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainBuilderTest.java @@ -17,11 +17,17 @@ package io.sermant.flowcontrol.res4j.chain; +import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.res4j.service.ServiceCollectorService; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.util.List; import java.util.Optional; @@ -35,6 +41,20 @@ public class HandlerChainBuilderTest { private static final String FIELD_NAME = "HANDLERS"; + private MockedStatic pluginConfigManagerMockedStatic; + + @Before + public void setUp() { + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); + } + + @After + public void tearDown() { + pluginConfigManagerMockedStatic.close(); + } + /** * ChainBuilder,determine the number of handlers */ diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntryTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntryTest.java index 04374f2709..a8e8eefad2 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntryTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainEntryTest.java @@ -21,6 +21,7 @@ import io.sermant.core.operation.converter.api.YamlConverter; import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.entity.DubboRequestEntity; import io.sermant.flowcontrol.common.entity.FlowControlResult; import io.sermant.flowcontrol.common.entity.HttpRequestEntity.Builder; @@ -44,7 +45,7 @@ * @since 2022-08-30 */ public class HandlerChainEntryTest { - private final HandlerChainEntry instance = HandlerChainEntry.INSTANCE; + private HandlerChainEntry instance; private final String sourceName = this.getClass().getName(); @@ -74,8 +75,12 @@ public void setUp() { pluginConfigManagerMockedStatic .when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(new FlowControlConfig()); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); operationManagerMockedStatic = Mockito.mockStatic(OperationManager.class); - operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)).thenReturn(new YamlConverterImpl()); + operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)) + .thenReturn(new YamlConverterImpl()); + instance = HandlerChainEntry.INSTANCE; } /** diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainTest.java index 4f4b9355c9..922cf612cf 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/HandlerChainTest.java @@ -21,6 +21,9 @@ import io.sermant.core.operation.OperationManager; import io.sermant.core.operation.converter.api.YamlConverter; +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.common.entity.HttpRequestEntity.Builder; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; @@ -49,16 +52,23 @@ public class HandlerChainTest { private MockedStatic operationManagerMockedStatic; + private MockedStatic pluginConfigManagerMockedStatic; + @Before public void setUp() { operationManagerMockedStatic = Mockito.mockStatic(OperationManager.class); - operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)).thenReturn(new YamlConverterImpl()); + operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)) + .thenReturn(new YamlConverterImpl()); + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); } // The mock static method needs to be closed when it is finished @After - public void tearDown() throws Exception { + public void tearDown() { operationManagerMockedStatic.close(); + pluginConfigManagerMockedStatic.close(); } /** @@ -86,32 +96,34 @@ public void testChain() { final RequestContext requestContext = ChainContext.getThreadLocalContext("test"); final HttpRequestEntity build = new Builder().setRequestType(RequestType.CLIENT).setApiPath("/api").build(); requestContext.setRequestEntity(build); + FlowControlScenario scenarioInfo = new FlowControlScenario(); final Set businessNames = Collections.singleton("test"); final Exception exception = new IllegalArgumentException("error"); final Object result = new Object(); - handlerChain.onBefore(requestContext, businessNames); - handlerChain.onThrow(requestContext, businessNames, exception); - handlerChain.onResult(requestContext, businessNames, result); + scenarioInfo.setMatchedScenarioNames(businessNames); + handlerChain.onBefore(requestContext, scenarioInfo); + handlerChain.onThrow(requestContext, scenarioInfo, exception); + handlerChain.onResult(requestContext, scenarioInfo, result); Mockito.verify(bulkheadRequestHandler, Mockito.times(1)) - .onBefore(requestContext, businessNames); + .onBefore(requestContext, scenarioInfo); Mockito.verify(bulkheadRequestHandler, Mockito.times(1)) - .onThrow(requestContext, businessNames, exception); + .onThrow(requestContext, scenarioInfo, exception); Mockito.verify(bulkheadRequestHandler, Mockito.times(1)) - .onResult(requestContext, businessNames, result); + .onResult(requestContext, scenarioInfo, result); Mockito.verify(circuitBreakerClientReqHandler, Mockito.times(1)) - .onBefore(requestContext, businessNames); + .onBefore(requestContext, scenarioInfo); Mockito.verify(circuitBreakerClientReqHandler, Mockito.times(1)) - .onThrow(requestContext, businessNames, exception); + .onThrow(requestContext, scenarioInfo, exception); Mockito.verify(circuitBreakerClientReqHandler, Mockito.times(1)) - .onResult(requestContext, businessNames, result); + .onResult(requestContext, scenarioInfo, result); Mockito.verify(faultRequestHandler, Mockito.times(1)) - .onBefore(requestContext, businessNames); + .onBefore(requestContext, scenarioInfo); Mockito.verify(faultRequestHandler, Mockito.times(1)) - .onThrow(requestContext, businessNames, exception); + .onThrow(requestContext, scenarioInfo, exception); Mockito.verify(faultRequestHandler, Mockito.times(1)) - .onResult(requestContext, businessNames, result); + .onResult(requestContext, scenarioInfo, result); } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/context/ChainContextTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/context/ChainContextTest.java index 58327d4873..4952e4ad3c 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/context/ChainContextTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/context/ChainContextTest.java @@ -48,6 +48,9 @@ public void getThreadLocalContext() { ChainContext.getThreadLocalContext("test125"); ChainContext.getThreadLocalContext("test126"); ChainContext.getThreadLocalContext("test127"); + ChainContext.getThreadLocalContext("test128"); + ChainContext.getThreadLocalContext("test130"); + ChainContext.getThreadLocalContext("test129"); try { ChainContext.getThreadLocalContext("test5"); } finally { diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitRequestHandlerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitRequestHandlerTest.java index 921045f174..7747dc413d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitRequestHandlerTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/CircuitRequestHandlerTest.java @@ -21,6 +21,7 @@ import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.ResolverManager; import io.sermant.flowcontrol.common.core.resolver.CircuitBreakerRuleResolver; import io.sermant.flowcontrol.common.core.rule.CircuitBreakerRule; @@ -67,6 +68,8 @@ public void setUp() { pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); pluginConfigManagerMockedStatic.when(()->PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(flowControlConfig); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); } // The mock static method needs to be closed when it is finished diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/RequestHandlerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/RequestHandlerTest.java index d8f72a81d5..af4a33c40c 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/RequestHandlerTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/chain/handler/RequestHandlerTest.java @@ -21,6 +21,7 @@ import io.sermant.core.operation.converter.api.YamlConverter; import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.ResolverManager; import io.sermant.flowcontrol.common.core.match.MatchGroupResolver; import io.sermant.flowcontrol.res4j.chain.HandlerChainEntry; @@ -63,7 +64,8 @@ public class RequestHandlerTest { @Before public void setUp() { operationManagerMockedStatic = Mockito.mockStatic(OperationManager.class); - operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)).thenReturn(new YamlConverterImpl()); + operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)) + .thenReturn(new YamlConverterImpl()); pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); FlowControlConfig flowControlConfig = new FlowControlConfig(); flowControlConfig.setEnableStartMonitor(true); @@ -72,6 +74,8 @@ public void setUp() { pluginConfigManagerMockedStatic .when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(flowControlConfig); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); publishMatchGroup(); loadTests(); entry = HandlerChainEntry.INSTANCE; diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitExceptionTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitExceptionTest.java new file mode 100644 index 0000000000..2164c3a2ad --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/exceptions/RateLimitExceptionTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.res4j.exceptions; + +import io.sermant.core.service.xds.entity.XdsHeaderOption; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +/** + * rate limit injection exception Test + * + * @author zhp + * @since 2024-12-05 + */ +public class RateLimitExceptionTest { + private RateLimitException rateLimitExceptionUnderTest; + + @Before + public void setUp() { + rateLimitExceptionUnderTest = new RateLimitException(Collections.singletonList(new XdsHeaderOption())); + } + + @Test + public void testXdsHeaderOptionsGetterAndSetter() { + final List xdsHeaderOptions = Collections.singletonList(new XdsHeaderOption()); + rateLimitExceptionUnderTest.setXdsHeaderOptions(xdsHeaderOptions); + Assert.assertEquals(xdsHeaderOptions, rateLimitExceptionUnderTest.getXdsHeaderOptions()); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/handler/HandlerTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/handler/HandlerTest.java index bf2071f855..549918f5bc 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/handler/HandlerTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/handler/HandlerTest.java @@ -25,6 +25,7 @@ import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.constants.RuleConstants; import io.sermant.flowcontrol.common.core.rule.BulkheadRule; import io.sermant.flowcontrol.common.core.rule.CircuitBreakerRule; @@ -76,6 +77,8 @@ public void setUp() { pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); pluginConfigManagerMockedStatic.when(()->PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(flowControlConfig); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); } // The mock static method needs to be closed when it is finished diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/DubboRest4jServiceImplTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/DubboRest4jServiceImplTest.java index b6b2759a9a..76fb42d511 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/DubboRest4jServiceImplTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/DubboRest4jServiceImplTest.java @@ -21,6 +21,7 @@ import io.sermant.core.operation.converter.api.YamlConverter; import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.entity.FlowControlResult; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.res4j.chain.context.ChainContext; @@ -48,6 +49,8 @@ public void setUp() { pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(new FlowControlConfig()); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); operationManagerMockedStatic = Mockito.mockStatic(OperationManager.class); operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)).thenReturn(new YamlConverterImpl()); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/HttpRest4jServiceImplTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/HttpRest4jServiceImplTest.java index a9b13b4d23..66c654f9f7 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/HttpRest4jServiceImplTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-service/src/test/java/io/sermant/flowcontrol/res4j/service/HttpRest4jServiceImplTest.java @@ -21,6 +21,7 @@ import io.sermant.core.operation.converter.api.YamlConverter; import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.entity.FlowControlResult; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.res4j.chain.context.ChainContext; @@ -48,6 +49,8 @@ public void setUp() { pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class)) .thenReturn(new FlowControlConfig()); + pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(new XdsFlowControlConfig()); operationManagerMockedStatic = Mockito.mockStatic(OperationManager.class); operationManagerMockedStatic.when(() -> OperationManager.getOperation(YamlConverter.class)).thenReturn(new YamlConverterImpl()); }