Skip to content

Commit

Permalink
Add XDS flow control function
Browse files Browse the repository at this point in the history
Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Dec 30, 2024
1 parent c66c14a commit a0ff1ca
Show file tree
Hide file tree
Showing 44 changed files with 1,053 additions and 180 deletions.
13 changes: 6 additions & 7 deletions sermant-plugins/sermant-flowcontrol/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ flow.control.plugin:
xds.flow.control.config:
# Whether to enable Xds flow control
enable: false
retry:
# The specified response status codes that need to be retried. Retry will be performed when the response's status
# code matches one of the specified codes.
x-sermant-retriable-status-codes:
# The specified response header names that need to be retried. Retry will be performed when the response contains
# the specified headers.
x-sermant-retriable-header-names:
# The specified response status codes that need to be retried. Retry will be performed when the response's status
# code matches one of the specified codes.
x-sermant-retriable-status-codes:
# The specified response header names that need to be retried. Retry will be performed when the response contains
# the specified headers.
x-sermant-retriable-header-names:
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public class XdsFlowControlConfig implements PluginConfig {
/**
* Specify the response code for retry, and retry will be executed when the response code is included
*/
@ConfigFieldKey("retry.x-sermant-retriable-status-codes")
@ConfigFieldKey("x-sermant-retriable-status-codes")
private List<String> retryStatusCodes;

/**
* Specify the response code for retry, and retry will be executed when the response header is included
*/
@ConfigFieldKey("retry.x-sermant-retriable-header-names")
@ConfigFieldKey("x-sermant-retriable-header-names")
private List<String> retryHeaderNames;

/**
Expand Down
21 changes: 21 additions & 0 deletions sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
<netflix-core.version>1.4.7.RELEASE</netflix-core.version>
<spring.cloud.context.version>2.2.0.RELEASE</spring.cloud.context.version>
<google.guava>31.1-jre</google.guava>
<apache-httpclient.version>4.5.13</apache-httpclient.version>
<okhttp.version>4.11.0</okhttp.version>
<okhttp.sq.version>2.7.5</okhttp.sq.version>
</properties>
<dependencies>
<!--compile-->
Expand Down Expand Up @@ -111,6 +114,24 @@
<version>${google.guava}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${apache-httpclient.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.sq.version}</version>
<scope>provided</scope>
</dependency>
<!--test-->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.sermant.flowcontrol;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.LogUtils;
import io.sermant.core.utils.ReflectUtils;
import io.sermant.flowcontrol.common.config.ConfigConst;
Expand All @@ -30,6 +31,7 @@
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -120,13 +122,21 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception
return context;
}
chooseHttpService().onBefore(className, httpRequestEntity.get(), result);
if (result.isSkip()) {
context.skip(null);
final Object response = allArguments[1];
if (response != null) {
setStatus.accept(response, result.getResponse().getCode());
getWriter.apply(response).print(result.buildResponseMsg());
if (!result.isSkip()) {
return context;
}
context.skip(null);
final Object response = allArguments[1];
if (response == null) {
return context;
}
setStatus.accept(response, result.getResponse().getCode());
getWriter.apply(response).print(result.buildResponseMsg());
for (Map.Entry<String, List<String>> entry : result.getResponse().getHeaders().entrySet()) {
if (CollectionUtils.isEmpty(entry.getValue())) {
continue;
}
setResponseHeader(response, entry.getKey(), entry.getValue().get(0));
}
return context;
}
Expand Down Expand Up @@ -167,6 +177,11 @@ private String getHeader(Object httpServletRequest, String key) {
new Object[]{key}).orElse(null);
}

private Optional<Object> setResponseHeader(Object httpServletResponse, String key, String value) {
return ReflectUtils.invokeMethod(httpServletResponse, "setHeader",
new Class[]{String.class, String.class}, new Object[]{key, value});
}

private PrintWriter getWriter(Object httpServletRequest) {
return (PrintWriter) ReflectUtils.invokeMethodWithNoneParameter(httpServletRequest, "getWriter")
.orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -209,10 +212,29 @@ public static class FeignRetry extends AbstractRetry {

@Override
public Optional<String> getCode(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "status");
return resultOptional.map(String::valueOf);
}

@Override
public Optional<Set<String>> getHeaderNames(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "headers");
if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) {
return Optional.empty();
}
Map<?, ?> headers = (Map<?, ?>) resultOptional.get();
Set<String> headerNames = new HashSet<>();
for (Map.Entry<?, ?> entry : headers.entrySet()) {
headerNames.add(entry.getKey().toString());
}
return Optional.of(headerNames);
}

private Optional<Object> getMethodResult(Object result, String methodName) {
final Optional<Method> status = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> {
final Method method;
try {
method = result.getClass().getDeclaredMethod("status");
method = result.getClass().getDeclaredMethod(methodName);
method.setAccessible(true);
return method;
} catch (NoSuchMethodException ex) {
Expand All @@ -225,7 +247,7 @@ public Optional<String> getCode(Object result) {
return Optional.empty();
}
try {
return Optional.of(String.valueOf(status.get().invoke(result)));
return Optional.of(status.get().invoke(result));
} catch (IllegalAccessException ex) {
LOGGER.warning(String.format(Locale.ENGLISH, "Can not find method status from class [%s]!",
result.getClass().getCanonicalName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType;
import io.sermant.flowcontrol.common.handler.retry.AbstractRetry;
import io.sermant.flowcontrol.common.handler.retry.RetryContext;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import io.sermant.flowcontrol.inject.DefaultClientHttpResponse;
import io.sermant.flowcontrol.inject.RetryClientHttpResponse;
import io.sermant.flowcontrol.service.InterceptorSupporter;
Expand All @@ -39,9 +40,12 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Logger;

Expand Down Expand Up @@ -95,12 +99,12 @@ protected final ExecuteContext doBefore(ExecuteContext context) {
if (flowControlResult.isSkip()) {
context.skip(new DefaultClientHttpResponse(flowControlResult));
} else {
tryExeWithRetry(context);
tryExeWithRetry(context, httpRequestEntity.get());
}
return context;
}

private void tryExeWithRetry(ExecuteContext context) {
private void tryExeWithRetry(ExecuteContext context, HttpRequestEntity httpRequestEntity) {
final Object[] allArguments = context.getArguments();
final HttpRequest request = (HttpRequest) context.getObject();
Object result;
Expand All @@ -118,12 +122,8 @@ private void tryExeWithRetry(ExecuteContext context) {
}
context.afterMethod(result, ex);
try {
final Optional<HttpRequestEntity> httpRequestEntity = convertToHttpEntity(request);
if (!httpRequestEntity.isPresent()) {
return;
}
RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity.get());
final List<Retry> handlers = getRetryHandler().getHandlers(httpRequestEntity.get());
RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity);
final List<Retry> handlers = getRetryHandler().getHandlers(httpRequestEntity);
if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) {
// retry only one policy
request.getHeaders().add(RETRY_KEY, RETRY_VALUE);
Expand All @@ -143,6 +143,7 @@ private void tryExeWithRetry(ExecuteContext context) {
@Override
protected ExecuteContext doThrow(ExecuteContext context) {
chooseHttpService().onThrow(className, context.getThrowable());
XdsThreadLocalUtil.removeSendByteFlag();
return context;
}

Expand All @@ -155,6 +156,7 @@ protected final ExecuteContext doAfter(ExecuteContext context) throws IOExceptio
chooseHttpService().onThrow(className, defaultException);
}
chooseHttpService().onAfter(className, context.getResult());
XdsThreadLocalUtil.removeSendByteFlag();
return context;
}

Expand All @@ -176,9 +178,38 @@ public static class HttpRetry extends AbstractRetry {

@Override
public Optional<String> getCode(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "getRawStatusCode");
return resultOptional.map(String::valueOf);
}

@Override
public Optional<Set<String>> getHeaderNames(Object result) {
Optional<Object> resultOptional = getMethodResult(result, "getHeaders");
if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) {
return Optional.empty();
}
Map<?, ?> headers = (Map<?, ?>) resultOptional.get();
Set<String> headerNames = new HashSet<>();
for (Map.Entry<?, ?> entry : headers.entrySet()) {
headerNames.add(entry.getKey().toString());
}
return Optional.of(headerNames);
}

@Override
public Class<? extends Throwable>[] retryExceptions() {
return getRetryExceptions();
}

@Override
public RetryFramework retryType() {
return RetryFramework.SPRING_CLOUD;
}

private Optional<Object> getMethodResult(Object result, String methodName) {
final Optional<Method> getRawStatusCode = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> {
try {
final Method method = result.getClass().getDeclaredMethod("getRawStatusCode");
final Method method = result.getClass().getDeclaredMethod(methodName);
method.setAccessible(true);
return method;
} catch (NoSuchMethodException ex) {
Expand All @@ -192,7 +223,7 @@ public Optional<String> getCode(Object result) {
return Optional.empty();
}
try {
return Optional.of(String.valueOf(getRawStatusCode.get().invoke(result)));
return Optional.of(getRawStatusCode.get().invoke(result));
} catch (IllegalAccessException ex) {
LOGGER.warning(String.format(Locale.ENGLISH,
"Can not find method getRawStatusCode from class [%s]!",
Expand All @@ -203,15 +234,5 @@ public Optional<String> getCode(Object result) {
}
return Optional.empty();
}

@Override
public Class<? extends Throwable>[] retryExceptions() {
return getRetryExceptions();
}

@Override
public RetryFramework retryType() {
return RetryFramework.SPRING_CLOUD;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package io.sermant.flowcontrol.retry.handler;

import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.ReflectUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.core.rule.RetryRule;
import io.sermant.flowcontrol.common.exception.InvokerWrapperException;
import io.sermant.flowcontrol.common.handler.retry.Retry;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
import io.sermant.flowcontrol.common.xds.retry.RetryConditionType;

import java.io.IOException;
import java.net.ConnectException;
Expand Down Expand Up @@ -67,6 +72,12 @@ public Predicate<Throwable> createExceptionPredicate(Class<? extends Throwable>[
.orElseGet(() -> throwable -> true);
}

@Override
public Predicate<Throwable> createExceptionPredicate(Class<? extends Throwable>[] retryExceptions,
XdsRetryPolicy policy) {
return (Throwable ex) -> needRetry(ex, policy);
}

private Predicate<Throwable> createExceptionPredicate(Class<? extends Throwable> retryClass) {
return (Throwable ex) -> {
if (retryClass.isAssignableFrom(getRealExceptionClass(ex))) {
Expand Down Expand Up @@ -120,4 +131,58 @@ public Predicate<Object> createResultPredicate(Retry retry, RetryRule rule) {
}
return result -> retry.needRetry(new HashSet<>(retryOnResponseStatus), result);
}

@Override
public Predicate<Object> createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy) {
return result -> needRetry(retry, result, xdsRetryPolicy);
}

private boolean needRetry(Retry retry, Object result, XdsRetryPolicy retryPolicy) {
List<String> conditions = getRetryConditions(retryPolicy);
if (CollectionUtils.isEmpty(conditions)) {
return false;
}
Optional<String> statusCodeOptional = retry.getCode(result);
if (!statusCodeOptional.isPresent()) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
boolean needRetry = retryConditionOptional.get().needRetry(retry, null, statusCode, result);
if (needRetry) {
return true;
}
}
return false;
}

private boolean needRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
List<String> conditions = getRetryConditions(retryPolicy);
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
boolean needRetry = retryConditionOptional.get().needRetry(null, ex, null, null);
if (needRetry) {
return true;
}
}
return false;
}

private static List<String> getRetryConditions(XdsRetryPolicy xdsRetryPolicy) {
String retryOn = xdsRetryPolicy.getRetryOn();
if (StringUtils.isExist(retryOn)) {
return Arrays.asList(retryOn.split(","));
}
return Collections.emptyList();
}
}
Loading

0 comments on commit a0ff1ca

Please sign in to comment.