From 91c6a96e29a7613d76ab62b43419eb740a38415b Mon Sep 17 00:00:00 2001 From: hanbingleixue Date: Sat, 21 Dec 2024 10:51:15 +0800 Subject: [PATCH] Add XDS retry and circuit-breaking functionality to the flow control plugin Signed-off-by: hanbingleixue --- .../flowcontrol-plugin/pom.xml | 35 ++ .../flowcontrol/AbstractXdsDeclarer.java | 36 +++ .../AbstractXdsHttpClientInterceptor.java | 247 +++++++++++++++ .../DispatcherServletInterceptor.java | 32 +- .../entity/ErrorCloseableHttpResponse.java | 199 ++++++++++++ .../entity/HttpAsyncInvokerResult.java | 62 ++++ .../HttpAsyncRequestProducerDecorator.java | 98 ++++++ .../inject/DefaultClientHttpResponse.java | 14 +- .../retry/FeignRequestInterceptor.java | 26 +- ...ttpClientConnectionSendHeaderDeclarer.java | 49 +++ .../retry/HttpRequestInterceptor.java | 79 +++-- .../HttpRequestSendHeaderInterceptor.java | 46 +++ .../retry/OkHttpSendHeaderDeclarer.java | 49 +++ .../SpringLbChooseServerInterceptor.java | 75 ++--- .../SpringRibbonChooseServerInterceptor.java | 46 ++- .../client/HttpAsyncClient4xDeclarer.java | 56 ++++ .../client/HttpAsyncClient4xInterceptor.java | 299 ++++++++++++++++++ .../retry/client/HttpClient4xDeclarer.java | 57 ++++ .../retry/client/HttpClient4xInterceptor.java | 228 +++++++++++++ .../handler/DefaultRetryPredicateCreator.java | 61 ++++ .../retry/handler/RetryHandlerV2.java | 38 ++- .../retry/handler/RetryPredicateCreator.java | 19 ++ .../service/InterceptorSupporter.java | 4 + ....core.plugin.agent.declarer.PluginDeclarer | 10 + ...io.sermant.core.plugin.config.PluginConfig | 1 + .../SpringLbChooseServerInterceptorTest.java | 29 +- ...ringRibbonChooseServerInterceptorTest.java | 47 ++- .../DefaultRetryPredicateCreatorTest.java | 10 + 28 files changed, 1847 insertions(+), 105 deletions(-) create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/ErrorCloseableHttpResponse.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncInvokerResult.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncRequestProducerDecorator.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xDeclarer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xInterceptor.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java create mode 100644 sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml index 325e583d05..636016879c 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml @@ -27,6 +27,11 @@ 1.4.7.RELEASE 2.2.0.RELEASE 31.1-jre + 2.2.5 + 4.5.13 + 4.1.5 + 4.11.0 + 2.7.5 @@ -99,6 +104,12 @@ ${netflix-core.version} provided + + com.netflix.ribbon + ribbon-loadbalancer + ${ribbon.version} + provided + jakarta.annotation jakarta.annotation-api @@ -111,6 +122,24 @@ ${google.guava} provided + + org.apache.httpcomponents + httpclient + ${apache-httpclient.version} + provided + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + provided + + + com.squareup.okhttp + okhttp + ${okhttp.sq.version} + provided + junit @@ -136,6 +165,12 @@ ${project.version} test + + org.apache.httpcomponents + httpasyncclient + ${http.client.async.verion} + provided + diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java new file mode 100644 index 0000000000..34a00a71c9 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; + +/** + * okhttp request declarer + * + * @author zhp + * @since 2024-11-30 + */ +public abstract class AbstractXdsDeclarer extends AbstractPluginDeclarer { + private final XdsFlowControlConfig config = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); + + @Override + public boolean isEnabled() { + return config.isEnable(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java new file mode 100644 index 0000000000..e1e2727330 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java @@ -0,0 +1,247 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; +import io.sermant.flowcontrol.common.util.StringUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.common.xds.circuit.CircuitBreakerManager; +import io.sermant.flowcontrol.common.xds.handler.XdsFlowControlHandler; +import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancer; +import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancerFactory; +import io.sermant.flowcontrol.service.InterceptorSupporter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * Enhance the client request sending functionality by performing Xds service instance discovery and circuit breaking + * during the request sending process + * + * @author zhp + * @since 2024-11-30 + */ +public abstract class AbstractXdsHttpClientInterceptor extends InterceptorSupporter { + protected static final String MESSAGE = "CircuitBreaker has forced open and deny any requests"; + + private static final int MIN_SUCCESS_CODE = 200; + + private static final int MAX_SUCCESS_CODE = 399; + + private static final int HUNDRED = 100; + + /** + * Perform circuit breaker judgment and handling + * + * @return The result of whether circuit breaking is needed + */ + public boolean needsCircuitBreak() { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getAddress())) { + return false; + } + Optional circuitBreakersOptional = XdsFlowControlHandler.INSTANCE. + getRequestCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); + if (!circuitBreakersOptional.isPresent()) { + return false; + } + int activeRequestNum = CircuitBreakerManager.incrementActiveRequests(scenarioInfo.getServiceName(), + scenarioInfo.getClusterName(), scenarioInfo.getAddress()); + int maxRequest = circuitBreakersOptional.get().getMaxRequests(); + return maxRequest != 0 && activeRequestNum > maxRequest; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + XdsThreadLocalUtil.removeSendByteFlag(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (context.getThrowable() != null || scenarioInfo == null) { + return context; + } + decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo); + return context; + } + + private void decrementActiveRequestsAndCountFailureRequests(ExecuteContext context, + FlowControlScenario scenarioInfo) { + CircuitBreakerManager.decrementActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getServiceName(), + scenarioInfo.getAddress()); + int statusCode = getStatusCode(context); + if (statusCode >= MIN_SUCCESS_CODE && statusCode <= MAX_SUCCESS_CODE) { + return; + } + countFailedRequests(scenarioInfo, statusCode); + } + + @Override + public ExecuteContext doThrow(ExecuteContext context) { + XdsThreadLocalUtil.removeSendByteFlag(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo == null) { + return context; + } + decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo); + return context; + } + + /** + * handler failure request + * + * @param statusCode response code + * @param scenario scenario information + */ + private void countFailedRequests(FlowControlScenario scenario, int statusCode) { + CircuitBreakerManager.decrementActiveRequests(scenario.getServiceName(), scenario.getClusterName(), + scenario.getAddress()); + Optional instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE. + getInstanceCircuitBreakers(scenario.getServiceName(), scenario.getClusterName()); + if (!instanceCircuitBreakersOptional.isPresent()) { + return; + } + XdsInstanceCircuitBreakers circuitBreakers = instanceCircuitBreakersOptional.get(); + CircuitBreakerManager.countFailureRequest(scenario, scenario.getAddress(), statusCode, circuitBreakers); + } + + /** + * Get status code + * + * @param context The execution context of the Interceptor + * @return response code + */ + protected abstract int getStatusCode(ExecuteContext context); + + /** + * choose serviceInstance by xds rule + * + * @return result + */ + protected Optional chooseServiceInstanceForXds() { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo == null || io.sermant.core.utils.StringUtils.isBlank(scenarioInfo.getServiceName()) + || io.sermant.core.utils.StringUtils.isEmpty(scenarioInfo.getClusterName())) { + return Optional.empty(); + } + Set serviceInstanceSet = XdsFlowControlHandler.INSTANCE. + getAllServerInstance(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); + if (serviceInstanceSet.isEmpty()) { + return Optional.empty(); + } + boolean needRetry = RetryContext.INSTANCE.isPolicyNeedRetry(); + if (needRetry) { + removeRetriedServiceInstance(serviceInstanceSet); + } + removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet); + return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo)); + } + + private void removeRetriedServiceInstance(Set serviceInstanceSet) { + RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy(); + retryPolicy.retryMark(); + List retriedInstance = retryPolicy.getAllRetriedInstance(); + Set allInstance = new HashSet<>(serviceInstanceSet); + for (Object retryInstance : retriedInstance) { + if (retryInstance instanceof ServiceInstance) { + serviceInstanceSet.remove(retryInstance); + } + } + if (CollectionUtils.isEmpty(serviceInstanceSet)) { + serviceInstanceSet.addAll(allInstance); + } + } + + private ServiceInstance chooseServiceInstanceByLoadBalancer(Set instanceSet, + FlowControlScenario scenarioInfo) { + XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory.getLoadBalancer(scenarioInfo.getServiceName(), + scenarioInfo.getClusterName()); + return loadBalancer.selectInstance(new ArrayList<>(instanceSet)); + } + + private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set instanceSet) { + Optional instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE. + getInstanceCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); + if (!instanceCircuitBreakersOptional.isPresent()) { + return; + } + XdsInstanceCircuitBreakers outlierDetection = instanceCircuitBreakersOptional.get(); + int count = instanceSet.size(); + if (checkMinInstanceNum(outlierDetection, count)) { + return; + } + List circuitBreakerInstances = new ArrayList<>(); + float maxCircuitBreakerPercent = (float) outlierDetection.getMaxEjectionPercent() / HUNDRED; + int maxCircuitBreakerInstances = (int) Math.floor(count * maxCircuitBreakerPercent); + for (ServiceInstance serviceInstance : instanceSet) { + if (hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) { + break; + } + String address = serviceInstance.getHost() + CommonConst.CONNECT + serviceInstance.getPort(); + if (CircuitBreakerManager.needCircuitBreaker(scenarioInfo, address, outlierDetection)) { + circuitBreakerInstances.add(serviceInstance); + } + } + if (checkHealthInstanceNum(count, outlierDetection, circuitBreakerInstances.size())) { + return; + } + circuitBreakerInstances.forEach(instanceSet::remove); + } + + private boolean hasReachedCircuitBreakerThreshold(List circuitBreakerInstances, + int maxCircuitBreakerInstances) { + return circuitBreakerInstances.size() >= maxCircuitBreakerInstances; + } + + private boolean checkHealthInstanceNum(int count, XdsInstanceCircuitBreakers outlierDetection, int size) { + return count * outlierDetection.getMinHealthPercent() >= (count - size); + } + + private boolean checkMinInstanceNum(XdsInstanceCircuitBreakers outlierDetection, int count) { + return outlierDetection.getFailurePercentageMinimumHosts() > count; + } + + @Override + protected boolean canInvoke(ExecuteContext context) { + return true; + } + + /** + * Get Retry Handler + * + * @return Retry Handlers + */ + protected List getRetryHandlers() { + if (XdsThreadLocalUtil.getScenarioInfo() != null) { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + RetryContext.INSTANCE.buildXdsRetryPolicy(scenarioInfo); + return getRetryHandler().getXdsHandlers(scenarioInfo); + } + return Collections.emptyList(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java index 93f550bf41..48375de04a 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol; import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.LogUtils; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.common.config.ConfigConst; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -120,13 +122,21 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception return context; } chooseHttpService().onBefore(className, httpRequestEntity.get(), result); - if (result.isSkip()) { - context.skip(null); - final Object response = allArguments[1]; - if (response != null) { - setStatus.accept(response, result.getResponse().getCode()); - getWriter.apply(response).print(result.buildResponseMsg()); + if (!result.isSkip()) { + return context; + } + context.skip(null); + final Object response = allArguments[1]; + if (response == null) { + return context; + } + setStatus.accept(response, result.getResponse().getCode()); + getWriter.apply(response).print(result.buildResponseMsg()); + for (Map.Entry> entry : result.getResponse().getHeaders().entrySet()) { + if (CollectionUtils.isEmpty(entry.getValue())) { + continue; } + setResponseHeader(response, entry.getKey(), entry.getValue().get(0)); } return context; } @@ -167,6 +177,16 @@ private String getHeader(Object httpServletRequest, String key) { new Object[]{key}).orElse(null); } + private Optional setResponseHeader(Object httpServletResponse, String key, String value) { + return ReflectUtils.invokeMethod(httpServletResponse, "setHeader", + new Class[]{String.class, String.class}, new Object[]{key, value}); + } + + private String getResponseHeader(Object httpServletResponse, String key) { + return (String) ReflectUtils.invokeMethod(httpServletResponse, "getHeader", new Class[]{String.class}, + new Object[]{key}).orElse(null); + } + private PrintWriter getWriter(Object httpServletRequest) { return (PrintWriter) ReflectUtils.invokeMethodWithNoneParameter(httpServletRequest, "getWriter") .orElse(null); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/ErrorCloseableHttpResponse.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/ErrorCloseableHttpResponse.java new file mode 100644 index 0000000000..e04e3da756 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/ErrorCloseableHttpResponse.java @@ -0,0 +1,199 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.entity; + +import org.apache.http.Header; +import org.apache.http.HeaderIterator; +import org.apache.http.HttpEntity; +import org.apache.http.HttpVersion; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; +import org.apache.http.message.BasicHeaderIterator; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpParams; + +import java.io.IOException; +import java.util.Locale; + +/** + * httpclient response + * + * @author zhouss + * @since 2024-12-20 + */ +public class ErrorCloseableHttpResponse implements CloseableHttpResponse { + private final int code; + + private final String message; + + private final ProtocolVersion protocolVersion; + + /** + * Constructor + * + * @param code Response code + * @param message error msg + * @param protocolVersion Request an agreement version + */ + public ErrorCloseableHttpResponse(int code, String message, ProtocolVersion protocolVersion) { + this.code = code; + this.message = message; + this.protocolVersion = protocolVersion; + } + + @Override + public void close() throws IOException { + + } + + @Override + public StatusLine getStatusLine() { + return new BasicStatusLine(HttpVersion.HTTP_1_1, this.code, this.message); + } + + @Override + public void setStatusLine(StatusLine statusline) { + + } + + @Override + public void setStatusLine(ProtocolVersion ver, int code) { + + } + + @Override + public void setStatusLine(ProtocolVersion ver, int code, String reason) { + + } + + @Override + public void setStatusCode(int code) throws IllegalStateException { + + } + + @Override + public void setReasonPhrase(String reason) throws IllegalStateException { + + } + + @Override + public HttpEntity getEntity() { + return new StringEntity(message == null ? "unKnow error" : message, ContentType.APPLICATION_JSON); + } + + @Override + public void setEntity(HttpEntity entity) { + } + + @Override + public Locale getLocale() { + return Locale.ENGLISH; + } + + @Override + public void setLocale(Locale loc) { + } + + @Override + public ProtocolVersion getProtocolVersion() { + return protocolVersion; + } + + @Override + public boolean containsHeader(String name) { + return false; + } + + @Override + public Header[] getHeaders(String name) { + return new Header[0]; + } + + @Override + public Header getFirstHeader(String name) { + return new BasicHeader("type", "SermantErrorResponse"); + } + + @Override + public Header getLastHeader(String name) { + return getFirstHeader(name); + } + + @Override + public Header[] getAllHeaders() { + return new Header[0]; + } + + @Override + public void addHeader(Header header) { + + } + + @Override + public void addHeader(String name, String value) { + + } + + @Override + public void setHeader(Header header) { + + } + + @Override + public void setHeader(String name, String value) { + + } + + @Override + public void setHeaders(Header[] headers) { + + } + + @Override + public void removeHeader(Header header) { + + } + + @Override + public void removeHeaders(String name) { + + } + + @Override + public HeaderIterator headerIterator() { + return this.headerIterator("errorHeaders"); + } + + @Override + public HeaderIterator headerIterator(String name) { + return new BasicHeaderIterator(new Header[0], name); + } + + @Override + public HttpParams getParams() { + return new BasicHttpParams(); + } + + @Override + public void setParams(HttpParams params) { + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncInvokerResult.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncInvokerResult.java new file mode 100644 index 0000000000..4302cd705a --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncInvokerResult.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.entity; + +/** + * HttpAsyncClient call result + * + * @author zhouss + * @since 2024-12-10 + */ +public class HttpAsyncInvokerResult { + private Object future; + + private Object result; + + /** + * Invoke the result + */ + public HttpAsyncInvokerResult() { + } + + /** + * Asynchronous call results + * + * @param future future + * @param result The result of the call, which may be an exception + */ + public HttpAsyncInvokerResult(Object future, Object result) { + this.future = future; + this.result = result; + } + + public Object getFuture() { + return future; + } + + public void setFuture(Object future) { + this.future = future; + } + + public Object getResult() { + return result; + } + + public void setResult(Object result) { + this.result = result; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncRequestProducerDecorator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncRequestProducerDecorator.java new file mode 100644 index 0000000000..968766392e --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/entity/HttpAsyncRequestProducerDecorator.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.entity; + +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.protocol.HttpContext; + +import java.io.IOException; +import java.util.function.Function; + +/** + * HttpAsyncRequestProducer modifier, handling http request + * + * @author zhouss + * @since 2024-12-20 + */ +public class HttpAsyncRequestProducerDecorator implements HttpAsyncRequestProducer { + private final HttpAsyncRequestProducer httpAsyncRequestProducer; + + private final Function requestDecorateFunc; + + private final Function hostDecorateFunc; + + /** + * Constructor + * + * @param httpAsyncRequestProducer Original Producer + * @param requestDecorateFunc Request a decorator + * @param hostDecorateFunc Address decorator + */ + public HttpAsyncRequestProducerDecorator( + HttpAsyncRequestProducer httpAsyncRequestProducer, + Function requestDecorateFunc, + Function hostDecorateFunc) { + this.httpAsyncRequestProducer = httpAsyncRequestProducer; + this.requestDecorateFunc = requestDecorateFunc; + this.hostDecorateFunc = hostDecorateFunc; + } + + @Override + public HttpHost getTarget() { + return hostDecorateFunc.apply(httpAsyncRequestProducer.getTarget()); + } + + @Override + public HttpRequest generateRequest() throws IOException, HttpException { + return requestDecorateFunc.apply(httpAsyncRequestProducer.generateRequest()); + } + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException { + httpAsyncRequestProducer.produceContent(encoder, ioControl); + } + + @Override + public void requestCompleted(HttpContext context) { + httpAsyncRequestProducer.requestCompleted(context); + } + + @Override + public void failed(Exception ex) { + httpAsyncRequestProducer.failed(ex); + } + + @Override + public boolean isRepeatable() { + return httpAsyncRequestProducer.isRepeatable(); + } + + @Override + public void resetRequest() throws IOException { + httpAsyncRequestProducer.resetRequest(); + } + + @Override + public void close() throws IOException { + httpAsyncRequestProducer.close(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java index 25a1d29074..e14240e15d 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/DefaultClientHttpResponse.java @@ -29,6 +29,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,6 +49,8 @@ public class DefaultClientHttpResponse implements ClientHttpResponse { private final String msg; + private final HttpHeaders responseHeaders; + private InputStream responseStream; /** @@ -63,6 +67,11 @@ public DefaultClientHttpResponse(FlowControlResult flowControlResult) { this.contentType = "application/text;charset=utf-8"; } this.msg = flowControlResult.buildResponseMsg(); + responseHeaders = new HttpHeaders(); + Map> headers = response.getHeaders(); + if (headers != null) { + responseHeaders.putAll(headers); + } } @Override @@ -99,8 +108,7 @@ public InputStream getBody() { @Override public HttpHeaders getHeaders() { - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.add(HttpHeaders.CONTENT_TYPE, contentType); - return httpHeaders; + responseHeaders.add(HttpHeaders.CONTENT_TYPE, contentType); + return responseHeaders; } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java index 3f9a616f42..6d72cd5136 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java @@ -41,9 +41,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -209,10 +212,29 @@ public static class FeignRetry extends AbstractRetry { @Override public Optional getCode(Object result) { + Optional resultOptional = getMethodResult(result, "status"); + return resultOptional.map(String::valueOf); + } + + @Override + public Optional> getHeaderNames(Object result) { + Optional resultOptional = getMethodResult(result, "headers"); + if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) { + return Optional.empty(); + } + Map headers = (Map) resultOptional.get(); + Set headerNames = new HashSet<>(); + for (Map.Entry entry : headers.entrySet()) { + headerNames.add(entry.getKey().toString()); + } + return Optional.of(headerNames); + } + + private Optional getMethodResult(Object result, String methodName) { final Optional status = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> { final Method method; try { - method = result.getClass().getDeclaredMethod("status"); + method = result.getClass().getDeclaredMethod(methodName); method.setAccessible(true); return method; } catch (NoSuchMethodException ex) { @@ -225,7 +247,7 @@ public Optional getCode(Object result) { return Optional.empty(); } try { - return Optional.of(String.valueOf(status.get().invoke(result))); + return Optional.of(status.get().invoke(result)); } catch (IllegalAccessException ex) { LOGGER.warning(String.format(Locale.ENGLISH, "Can not find method status from class [%s]!", result.getClass().getCanonicalName())); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java new file mode 100644 index 0000000000..0f2f85b640 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * http client request declarer + * + * @author zhp + * @since 2024-11-30 + */ +public class HttpClientConnectionSendHeaderDeclarer extends AbstractXdsDeclarer { + /** + * the fully qualified name of the enhanced class + */ + private static final String[] ENHANCE_CLASS = {"org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnection", + "org.apache.http.impl.DefaultBHttpClientConnection"}; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("sendRequestHeader"), + new HttpRequestSendHeaderInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java index 905d36eb22..0695cf784b 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java @@ -20,13 +20,16 @@ import io.github.resilience4j.retry.Retry; import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.CollectionUtils; import io.sermant.flowcontrol.common.config.ConfigConst; import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.entity.FlowControlServiceMeta; import io.sermant.flowcontrol.common.entity.HttpRequestEntity; import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import io.sermant.flowcontrol.inject.DefaultClientHttpResponse; import io.sermant.flowcontrol.inject.RetryClientHttpResponse; import io.sermant.flowcontrol.service.InterceptorSupporter; @@ -39,9 +42,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.logging.Logger; @@ -95,12 +101,12 @@ protected final ExecuteContext doBefore(ExecuteContext context) { if (flowControlResult.isSkip()) { context.skip(new DefaultClientHttpResponse(flowControlResult)); } else { - tryExeWithRetry(context); + tryExeWithRetry(context, httpRequestEntity.get()); } return context; } - private void tryExeWithRetry(ExecuteContext context) { + private void tryExeWithRetry(ExecuteContext context, HttpRequestEntity httpRequestEntity) { final Object[] allArguments = context.getArguments(); final HttpRequest request = (HttpRequest) context.getObject(); Object result; @@ -118,18 +124,14 @@ private void tryExeWithRetry(ExecuteContext context) { } context.afterMethod(result, ex); try { - final Optional httpRequestEntity = convertToHttpEntity(request); - if (!httpRequestEntity.isPresent()) { - return; - } - RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity.get()); - final List handlers = getRetryHandler().getHandlers(httpRequestEntity.get()); + final List handlers = getRetryHandlers(httpRequestEntity); if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { // retry only one policy request.getHeaders().add(RETRY_KEY, RETRY_VALUE); result = handlers.get(0).executeCheckedSupplier(retryFunc::get); request.getHeaders().remove(RETRY_KEY); } + XdsThreadLocalUtil.removeSendByteFlag(); } catch (Throwable throwable) { LOGGER.warning(String.format(Locale.ENGLISH, "Failed to invoke method:%s for few times, reason:%s", @@ -140,9 +142,24 @@ private void tryExeWithRetry(ExecuteContext context) { context.skip(result); } + private List getRetryHandlers(HttpRequestEntity httpRequestEntity) { + List handlers = Collections.emptyList(); + if (xdsFlowControlConfig.isEnable() && XdsThreadLocalUtil.getScenarioInfo() != null) { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + RetryContext.INSTANCE.buildXdsRetryPolicy(scenarioInfo); + handlers = getRetryHandler().getXdsHandlers(scenarioInfo); + } + if (CollectionUtils.isEmpty(handlers)) { + RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity); + return getRetryHandler().getHandlers(httpRequestEntity); + } + return handlers; + } + @Override protected ExecuteContext doThrow(ExecuteContext context) { chooseHttpService().onThrow(className, context.getThrowable()); + XdsThreadLocalUtil.removeSendByteFlag(); return context; } @@ -155,6 +172,7 @@ protected final ExecuteContext doAfter(ExecuteContext context) throws IOExceptio chooseHttpService().onThrow(className, defaultException); } chooseHttpService().onAfter(className, context.getResult()); + XdsThreadLocalUtil.removeSendByteFlag(); return context; } @@ -175,10 +193,39 @@ public static class HttpRetry extends AbstractRetry { private static final String METHOD_KEY = "ClientHttpResponse#getRawStatusCode"; @Override - protected Optional getCode(Object result) { + public Optional getCode(Object result) { + Optional resultOptional = getMethodResult(result, "getRawStatusCode"); + return resultOptional.map(String::valueOf); + } + + @Override + public Optional> getHeaderNames(Object result) { + Optional resultOptional = getMethodResult(result, "getHeaders"); + if (!resultOptional.isPresent() || !(resultOptional.get() instanceof Map)) { + return Optional.empty(); + } + Map headers = (Map) resultOptional.get(); + Set headerNames = new HashSet<>(); + for (Map.Entry entry : headers.entrySet()) { + headerNames.add(entry.getKey().toString()); + } + return Optional.of(headerNames); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING_CLOUD; + } + + private Optional getMethodResult(Object result, String methodName) { final Optional getRawStatusCode = getInvokerMethod(result.getClass().getName() + METHOD_KEY, fn -> { try { - final Method method = result.getClass().getDeclaredMethod("getRawStatusCode"); + final Method method = result.getClass().getDeclaredMethod(methodName); method.setAccessible(true); return method; } catch (NoSuchMethodException ex) { @@ -192,7 +239,7 @@ protected Optional getCode(Object result) { return Optional.empty(); } try { - return Optional.of(String.valueOf(getRawStatusCode.get().invoke(result))); + return Optional.of(getRawStatusCode.get().invoke(result)); } catch (IllegalAccessException ex) { LOGGER.warning(String.format(Locale.ENGLISH, "Can not find method getRawStatusCode from class [%s]!", @@ -203,15 +250,5 @@ protected Optional getCode(Object result) { } return Optional.empty(); } - - @Override - public Class[] retryExceptions() { - return getRetryExceptions(); - } - - @Override - public RetryFramework retryType() { - return RetryFramework.SPRING_CLOUD; - } } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java new file mode 100644 index 0000000000..b5b7395f8c --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.Interceptor; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +/** + * Enhance the request header sending method to include an indicator of whether the request byte stream has been sent + * to the server + * + * @author zhp + * @since 2024-11-30 + */ +public class HttpRequestSendHeaderInterceptor implements Interceptor { + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + XdsThreadLocalUtil.setSendByteFlag(true); + return context; + } + + @Override + public ExecuteContext onThrow(ExecuteContext context) throws Exception { + return context; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java new file mode 100644 index 0000000000..d84df73e1c --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * okhttp request declarer + * + * @author zhp + * @since 2024-11-30 + */ +public class OkHttpSendHeaderDeclarer extends AbstractXdsDeclarer { + /** + * the fully qualified name of the enhanced class + */ + private static final String[] ENHANCE_CLASS = {"com.squareup.okhttp.internal.http.Http1xStream", + "com.squareup.okhttp.internal.http.Http2xStream, okhttp3.internal.connection.Exchange"}; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("writeRequestHeaders"), + new HttpRequestSendHeaderInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptor.java index 2b5e9359f5..d30d018991 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptor.java @@ -19,17 +19,14 @@ import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.CollectionUtils; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.common.handler.retry.RetryContext; -import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnSamePolicy; import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; import io.sermant.flowcontrol.service.InterceptorSupporter; -import org.springframework.cloud.client.ServiceInstance; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Locale; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.logging.Logger; @@ -38,7 +35,7 @@ * that the last called service can be selected again based on load balancing when retrying. * * @author zhouss - * @see RetryOnSamePolicy + * @see io.sermant.flowcontrol.common.handler.retry.policy.RetryOnUntriedPolicy * @since 2022-07-25 */ public class SpringLbChooseServerInterceptor extends InterceptorSupporter { @@ -59,7 +56,7 @@ public class SpringLbChooseServerInterceptor extends InterceptorSupporter { @Override protected ExecuteContext doBefore(ExecuteContext context) throws Exception { if (RetryContext.INSTANCE.isPolicyNeedRetry()) { - tryChangeServiceInstanceForRetry(context); + removeRetriedServiceInstance(context); } return context; } @@ -76,48 +73,12 @@ protected ExecuteContext doAfter(ExecuteContext context) throws Exception { return context; } - private void tryChangeServiceInstanceForRetry(ExecuteContext context) { - final RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy(); - if (retryPolicy != null && retryPolicy.getLastRetryServer() != null) { - retryPolicy.retryMark(); - final Optional result = buildResult(retryPolicy.getLastRetryServer(), - context.getMethod().getReturnType().getName()); - if (!result.isPresent()) { - return; - } - context.skip(result.get()); - } - } - - private Optional buildResult(Object lastServer, String responseClassName) { - String defaultResponseClazz = null; - if (RESPONSE_CLASS.equals(responseClassName)) { - defaultResponseClazz = DEFAULT_RESPONSE_CLASS; - } else if (RESPONSE_REACTIVE_CLASS.equals(responseClassName)) { - defaultResponseClazz = DEFAULT_RESPONSE_REACTIVE_CLASS; - } - if (defaultResponseClazz == null) { - return Optional.empty(); - } - final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); - try { - final Class clazz = contextClassLoader.loadClass(defaultResponseClazz); - final Constructor declaredConstructor = clazz.getDeclaredConstructor(ServiceInstance.class); - return Optional.of(declaredConstructor.newInstance(lastServer)); - } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException - | InvocationTargetException exception) { - LOGGER.warning(String.format(Locale.ENGLISH, - "Can not create loadbalancer response for retry! className: [%s]", defaultResponseClazz)); - } - return Optional.empty(); - } - private void updateServiceInstance(Object result) { final Optional getServer = ReflectUtils.invokeMethod(result, "getServer", null, null); if (!getServer.isPresent()) { return; } - RetryContext.INSTANCE.updateServiceInstance(getServer.get()); + RetryContext.INSTANCE.updateRetriedServiceInstance(getServer.get()); } private boolean isTarget(Object result) { @@ -129,4 +90,28 @@ private boolean isTarget(Object result) { protected boolean canInvoke(ExecuteContext context) { return true; } + + /** + * remove retried instance + * + * @param context The execution context of the Interceptor + */ + public void removeRetriedServiceInstance(ExecuteContext context) { + Object[] arguments = context.getArguments(); + if (arguments == null || arguments.length == 0 || !(arguments[0] instanceof List)) { + return; + } + final RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy(); + if (retryPolicy == null || CollectionUtils.isEmpty(retryPolicy.getAllRetriedInstance())) { + return; + } + retryPolicy.retryMark(); + List instances = new ArrayList<>((List) arguments[0]); + for (Object instance : retryPolicy.getAllRetriedInstance()) { + instances.remove(instance); + } + if (!instances.isEmpty()) { + arguments[0] = instances; + } + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptor.java index 718e2fd0cf..fc8a2c5f1e 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptor.java @@ -20,31 +20,35 @@ import com.google.common.base.Optional; import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.utils.CollectionUtils; import io.sermant.flowcontrol.common.handler.retry.RetryContext; -import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnSamePolicy; +import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnUntriedPolicy; import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; import io.sermant.flowcontrol.service.InterceptorSupporter; +import java.util.ArrayList; +import java.util.List; + /** * Ribbon method select instance method to intercept. Obtain the last called service through interception, so that the * last called service can be selected again based on load balancing when retrying. * * @author zhouss - * @see RetryOnSamePolicy + * @see RetryOnUntriedPolicy * @since 2022-07-25 */ public class SpringRibbonChooseServerInterceptor extends InterceptorSupporter { @Override protected ExecuteContext doBefore(ExecuteContext context) throws Exception { if (RetryContext.INSTANCE.isPolicyNeedRetry()) { - tryChangeServiceInstanceForRetry(context); + removeRetriedServiceInstance(context); } return context; } @Override protected ExecuteContext doAfter(ExecuteContext context) throws Exception { - if (!RetryContext.INSTANCE.isPolicyNeedRetry()) { + if (RetryContext.INSTANCE.isPolicyNeedRetry()) { updateServiceInstance(context); } return context; @@ -57,15 +61,7 @@ private void updateServiceInstance(ExecuteContext context) { if (!serverInstanceOption.isPresent()) { return; } - RetryContext.INSTANCE.updateServiceInstance(serverInstanceOption.get()); - } - } - - private void tryChangeServiceInstanceForRetry(ExecuteContext context) { - final RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy(); - if (retryPolicy != null && retryPolicy.getLastRetryServer() != null) { - retryPolicy.retryMark(); - context.skip(Optional.of(retryPolicy.getLastRetryServer())); + RetryContext.INSTANCE.updateRetriedServiceInstance(serverInstanceOption.get()); } } @@ -73,4 +69,28 @@ private void tryChangeServiceInstanceForRetry(ExecuteContext context) { protected boolean canInvoke(ExecuteContext context) { return true; } + + /** + * remove retried instance + * + * @param context The execution context of the Interceptor + */ + public void removeRetriedServiceInstance(ExecuteContext context) { + Object[] arguments = context.getArguments(); + if (arguments == null || arguments.length == 0 || !(arguments[0] instanceof List)) { + return; + } + final RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy(); + if (retryPolicy == null || CollectionUtils.isEmpty(retryPolicy.getAllRetriedInstance())) { + return; + } + retryPolicy.retryMark(); + List instances = new ArrayList<>((List) arguments[0]); + for (Object instance : retryPolicy.getAllRetriedInstance()) { + instances.remove(instance); + } + if (!instances.isEmpty()) { + arguments[0] = instances; + } + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xDeclarer.java new file mode 100644 index 0000000000..8b5f40a5e8 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xDeclarer.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * Interception is handled for httpAsyncClient 4.x + * + * @author zhp + * @since 2024-12-20 + */ +public class HttpAsyncClient4xDeclarer extends AbstractXdsDeclarer { + /** + * Fully qualified HTTP requests for enhanced classes + */ + private static final String[] ENHANCE_CLASSES = { + "org.apache.http.impl.nio.client.InternalHttpAsyncClient", + "org.apache.http.impl.nio.client.MinimalHttpAsyncClient" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("execute") + .and(MethodMatcher.paramTypesEqual( + "org.apache.http.nio.protocol.HttpAsyncRequestProducer", + "org.apache.http.nio.protocol.HttpAsyncResponseConsumer", + "org.apache.http.protocol.HttpContext", + "org.apache.http.concurrent.FutureCallback")), + new HttpAsyncClient4xInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xInterceptor.java new file mode 100644 index 0000000000..19c43e48d3 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpAsyncClient4xInterceptor.java @@ -0,0 +1,299 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.github.resilience4j.retry.Retry; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; +import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.entity.HttpAsyncInvokerResult; +import io.sermant.flowcontrol.entity.HttpAsyncRequestProducerDecorator; + +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.ProtocolVersion; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIUtils; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.EnglishReasonPhraseCatalog; +import org.apache.http.message.BasicHttpResponse; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * HTTP interception only for version 4. x + * + * @author zhp + * @since 2024-12-20 + */ +public class HttpAsyncClient4xInterceptor extends AbstractXdsHttpClientInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final int CALL_BACK_INDEX = 3; + + private static final String PROTOCOL = "HTTP"; + + private final String className = HttpAsyncClient4xInterceptor.class.getName(); + + private final io.sermant.flowcontrol.common.handler.retry.Retry retry = new HttpAsyncClientRetry(); + + /** + * Pre trigger point + * + * @param context Execution context + * @return Execution context + * @throws Exception Execution exception + */ + @Override + public ExecuteContext doBefore(ExecuteContext context) throws Exception { + Object httpAsyncRequestProducerArgument = context.getArguments()[0]; + if (!(httpAsyncRequestProducerArgument instanceof HttpAsyncRequestProducer)) { + return context; + } + HttpAsyncRequestProducer httpAsyncRequestProducer + = (HttpAsyncRequestProducer) httpAsyncRequestProducerArgument; + final FlowControlResult flowControlResult = new FlowControlResult(); + HttpRequestBase httpRequest = (HttpRequestBase) httpAsyncRequestProducer.generateRequest(); + if (!needHandleXdsRouter(httpRequest)) { + return context; + } + final Optional httpRequestEntity = convertToHttpEntity(httpRequest); + if (!httpRequestEntity.isPresent()) { + return context; + } + FutureCallback callback = (FutureCallback) context.getArguments()[CALL_BACK_INDEX]; + if (needsCircuitBreak()) { + context.skip(buildBiFunc(callback, flowControlResult, MESSAGE)); + } + chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + if (flowControlResult.isSkip()) { + context.skip(buildBiFunc(callback, flowControlResult, flowControlResult.buildResponseMsg())); + return context; + } + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return context; + } + ServiceInstance instance = serviceInstanceOptional.get(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + scenarioInfo.setAddress(instance.getHost() + CommonConst.CONNECT + instance.getPort()); + try { + context.getArguments()[0] = rebuildProducer(context, + new URI(XdsRouterUtils.rebuildUrlByXdsServiceInstance(httpRequest.getURI(), instance))); + } catch (URISyntaxException e) { + LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage()); + return context; + } + tryExeWithRetry(context, callback, httpRequestEntity.get()); + return context; + } + + @Override + protected int getStatusCode(ExecuteContext context) { + Optional statusCodeOptional = retry.getCode(context.getResult()); + return statusCodeOptional.map(Integer::parseInt).orElse(-1); + } + + private void tryExeWithRetry(ExecuteContext context, FutureCallback callback, + HttpRequestEntity httpRequestEntity) { + final Object[] allArguments = context.getArguments(); + Object result; + Throwable ex = context.getThrowable(); + final Supplier retryFunc = createRetryFunc(context.getObject(), + context.getMethod(), allArguments, context.getResult()); + RetryContext.INSTANCE.markRetry(retry); + try { + // first execution taking over the host logic + result = retryFunc.get(); + } catch (Throwable throwable) { + ex = throwable; + result = new BasicHttpResponse(new BasicStatusLine( + new ProtocolVersion(PROTOCOL, 1, 1), HttpStatus.SC_NOT_FOUND, + getExMsg(throwable)), EnglishReasonPhraseCatalog.INSTANCE, Locale.SIMPLIFIED_CHINESE); + log(throwable); + } + context.afterMethod(result, ex); + try { + RetryContext.INSTANCE.buildRetryPolicy(httpRequestEntity); + final List handlers = getRetryHandlers(); + if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { + result = handlers.get(0).executeCheckedSupplier(retryFunc::get); + } + XdsThreadLocalUtil.removeSendByteFlag(); + } catch (Throwable throwable) { + LOGGER.warning(String.format(Locale.ENGLISH, + "Failed to invoke method:%s for few times, reason:%s", + context.getMethod().getName(), getExMsg(throwable))); + } finally { + RetryContext.INSTANCE.remove(); + } + context.skip(result); + } + + private BiFunction buildBiFunc(FutureCallback callback, + FlowControlResult flowControlResult, String msg) { + return (timeout, timeUnit) -> { + int statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR; + if (flowControlResult.getResponse() == null) { + statusCode = flowControlResult.getResponse().getCode(); + } + final BasicHttpResponse basicHttpResponse = new BasicHttpResponse(new BasicStatusLine( + new ProtocolVersion(PROTOCOL, 1, 1), statusCode, msg), + EnglishReasonPhraseCatalog.INSTANCE, Locale.SIMPLIFIED_CHINESE); + basicHttpResponse.setEntity(new StringEntity(msg, ContentType.TEXT_HTML)); + HttpAsyncInvokerResult result = new HttpAsyncInvokerResult(null, basicHttpResponse); + notify(callback, result.getResult()); + return result; + }; + } + + private Optional convertToHttpEntity(HttpRequestBase httpRequest) { + if (httpRequest == null || httpRequest.getURI() == null) { + return Optional.empty(); + } + URI uri = httpRequest.getURI(); + final Map headers = getHeaders(httpRequest); + return Optional.of(new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT) + .setApiPath(uri.getPath()).setHeaders(headers) + .setMethod(httpRequest.getMethod()) + .setServiceName(uri.getHost()) + .build()); + } + + private void notify(FutureCallback callback, Object response) { + if (callback == null) { + return; + } + if (response instanceof Exception) { + callback.failed((Exception) response); + } else { + callback.completed((HttpResponse) response); + } + } + + private Map getHeaders(HttpRequest httpRequest) { + Map headerMap = new HashMap<>(); + for (Header header : httpRequest.getAllHeaders()) { + headerMap.putIfAbsent(header.getName(), header.getValue()); + } + return headerMap; + } + + private boolean needHandleXdsRouter(HttpRequestBase httpRequest) { + URI uri = httpRequest.getURI(); + String host = uri.getHost(); + String serviceName = host.split(CommonConst.ESCAPED_POINT)[0]; + return XdsRouterUtils.isXdsRouteRequired(serviceName); + } + + private HttpAsyncRequestProducer rebuildProducer(ExecuteContext context, URI newUri) { + return new HttpAsyncRequestProducerDecorator((HttpAsyncRequestProducer) context.getArguments()[0], + buildRequestDecorator(newUri), buildHostDecorator(newUri)); + } + + private Function buildHostDecorator(URI newUri) { + return httpHost -> rebuildHttpHost(newUri); + } + + private Function buildRequestDecorator(URI newUri) { + return httpRequest -> updateRequestUri(newUri, httpRequest); + } + + private HttpHost rebuildHttpHost(URI newUri) { + return URIUtils.extractHost(newUri); + } + + private HttpRequest updateRequestUri(URI newUri, HttpRequest httpUriRequest) { + HttpRequestBase httpRequest = (HttpRequestBase) httpUriRequest; + httpRequest.setURI(newUri); + return httpRequest; + } + + /** + * Http Async Client retry + * + * @since 2022-02-21 + */ + public static class HttpAsyncClientRetry extends AbstractRetry { + @Override + public Optional getCode(Object result) { + if (!(result instanceof HttpResponse)) { + return Optional.empty(); + } + HttpResponse httpResponse = (HttpResponse) result; + if (httpResponse.getStatusLine() == null) { + return Optional.empty(); + } + return Optional.of(String.valueOf(httpResponse.getStatusLine().getStatusCode())); + } + + @Override + public Optional> getHeaderNames(Object result) { + if (!(result instanceof HttpResponse)) { + return Optional.empty(); + } + HttpResponse httpResponse = (HttpResponse) result; + Set headerNames = new HashSet<>(); + for (Header header : httpResponse.getAllHeaders()) { + headerNames.add(header.getName()); + } + return Optional.of(headerNames); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java new file mode 100644 index 0000000000..f260e01b5e --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * For HTTP requests, obtain the instance list from the registry to intercept the

4.x version only

+ * + * @author zhp + * @since 2024-12-20 + */ +public class HttpClient4xDeclarer extends AbstractXdsDeclarer { + /** + * Fully qualified HTTP requests for enhanced classes + */ + private static final String[] ENHANCE_CLASSES = { + "org.apache.http.impl.client.AbstractHttpClient", + "org.apache.http.impl.client.DefaultRequestDirector", + "org.apache.http.impl.client.InternalHttpClient", + "org.apache.http.impl.client.MinimalHttpClient" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains("doExecute", "execute") + .and(MethodMatcher.paramTypesEqual( + "org.apache.http.HttpHost", + "org.apache.http.HttpRequest", + "org.apache.http.protocol.HttpContext")), + new HttpClient4xInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java new file mode 100644 index 0000000000..0d3ecac686 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java @@ -0,0 +1,228 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.github.resilience4j.retry.Retry; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; +import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.entity.ErrorCloseableHttpResponse; + +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpRequestBase; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * HTTP interception only for version 4. x + * + * @author zhp + * @since 2024-12-20 + */ +public class HttpClient4xInterceptor extends AbstractXdsHttpClientInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private final String className = HttpClient4xInterceptor.class.getName(); + + private final io.sermant.flowcontrol.common.handler.retry.Retry retry = new HttpClientRetry(); + + /** + * Pre-trigger point + * + * @param context Execution context + * @return Execution context + */ + @Override + public ExecuteContext doBefore(ExecuteContext context) { + Object[] arguments = context.getArguments(); + Object httpRequestObject = arguments[1]; + if (!(httpRequestObject instanceof HttpRequestBase)) { + return context; + } + final HttpRequestBase httpRequest = (HttpRequestBase) httpRequestObject; + if (!needHandleXdsRouter(httpRequest) && RetryContext.INSTANCE.getRetryPolicy() == null) { + return context; + } + if (needsCircuitBreak()) { + context.skip(new ErrorCloseableHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, MESSAGE, + httpRequest.getProtocolVersion())); + } + final Optional httpRequestEntity = convertToHttpEntity(httpRequest); + if (!httpRequestEntity.isPresent()) { + return context; + } + final FlowControlResult flowControlResult = new FlowControlResult(); + chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + if (flowControlResult.isSkip()) { + context.skip(new ErrorCloseableHttpResponse(flowControlResult.getResponse().getCode(), + flowControlResult.buildResponseMsg(), httpRequest.getProtocolVersion())); + return context; + } + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return context; + } + ServiceInstance instance = serviceInstanceOptional.get(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + scenarioInfo.setAddress(instance.getHost() + CommonConst.CONNECT + instance.getPort()); + try { + httpRequest.setURI(new URI(XdsRouterUtils.rebuildUrlByXdsServiceInstance(httpRequest.getURI(), instance))); + arguments[0] = new HttpHost(instance.getHost(), instance.getPort()); + } catch (URISyntaxException e) { + LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage()); + return context; + } + tryExeWithRetry(context, httpRequest, httpRequestEntity.get()); + return context; + } + + private void tryExeWithRetry(ExecuteContext context, HttpRequestBase httpRequest, HttpRequestEntity requestEntity) { + final Object[] allArguments = context.getArguments(); + Object result; + Throwable ex = context.getThrowable(); + final Supplier retryFunc = createRetryFunc(context.getObject(), + context.getMethod(), allArguments, context.getResult()); + RetryContext.INSTANCE.markRetry(retry); + try { + // first execution taking over the host logic + result = retryFunc.get(); + } catch (Throwable throwable) { + ex = throwable; + result = new ErrorCloseableHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, + throwable.getMessage(), httpRequest.getProtocolVersion()); + log(throwable); + } + context.afterMethod(result, ex); + try { + RetryContext.INSTANCE.buildRetryPolicy(requestEntity); + final List handlers = getRetryHandlers(); + if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { + result = handlers.get(0).executeCheckedSupplier(retryFunc::get); + } + XdsThreadLocalUtil.removeSendByteFlag(); + } catch (Throwable throwable) { + LOGGER.warning(String.format(Locale.ENGLISH, + "Failed to invoke method:%s for few times, reason:%s", + context.getMethod().getName(), getExMsg(throwable))); + } finally { + RetryContext.INSTANCE.remove(); + } + context.skip(result); + } + + private Optional convertToHttpEntity(HttpRequestBase httpRequest) { + if (httpRequest == null || httpRequest.getURI() == null) { + return Optional.empty(); + } + URI uri = httpRequest.getURI(); + final Map headers = getHeaders(httpRequest); + return Optional.of(new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT) + .setApiPath(uri.getPath()).setHeaders(headers) + .setMethod(httpRequest.getMethod()) + .setServiceName(uri.getHost()) + .build()); + } + + private Map getHeaders(HttpRequest httpRequest) { + Map headerMap = new HashMap<>(); + for (Header header : httpRequest.getAllHeaders()) { + headerMap.putIfAbsent(header.getName(), header.getValue()); + } + return headerMap; + } + + private boolean needHandleXdsRouter(HttpRequestBase httpRequest) { + URI uri = httpRequest.getURI(); + String host = uri.getHost(); + String serviceName = host.split(CommonConst.ESCAPED_POINT)[0]; + return XdsRouterUtils.isXdsRouteRequired(serviceName); + } + + @Override + public int getStatusCode(ExecuteContext context) { + Optional statusCodeOptional = retry.getCode(context.getResult()); + return statusCodeOptional.map(Integer::parseInt).orElse(-1); + } + + /** + * Http Client retry + * + * @since 2022-02-21 + */ + public static class HttpClientRetry extends AbstractRetry { + @Override + public Optional getCode(Object result) { + if (!(result instanceof CloseableHttpResponse)) { + return Optional.empty(); + } + CloseableHttpResponse httpResponse = (CloseableHttpResponse) result; + if (httpResponse.getStatusLine() == null) { + return Optional.empty(); + } + return Optional.of(String.valueOf(httpResponse.getStatusLine().getStatusCode())); + } + + @Override + public Optional> getHeaderNames(Object result) { + if (!(result instanceof CloseableHttpResponse)) { + return Optional.empty(); + } + CloseableHttpResponse httpResponse = (CloseableHttpResponse) result; + Set headerNames = new HashSet<>(); + for (Header header : httpResponse.getAllHeaders()) { + headerNames.add(header.getName()); + } + return Optional.of(headerNames); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java index 09375ae46c..b0092d85cd 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreator.java @@ -17,10 +17,14 @@ package io.sermant.flowcontrol.retry.handler; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.core.utils.ReflectUtils; +import io.sermant.core.utils.StringUtils; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; import io.sermant.flowcontrol.common.handler.retry.Retry; +import io.sermant.flowcontrol.common.xds.retry.RetryCondition; +import io.sermant.flowcontrol.common.xds.retry.RetryConditionManager; import java.io.IOException; import java.net.ConnectException; @@ -67,6 +71,17 @@ public Predicate createExceptionPredicate(Class[ .orElseGet(() -> throwable -> true); } + @Override + public Predicate createExceptionPredicate(Class[] retryExceptions, + XdsRetryPolicy policy) { + String retryOn = policy.getRetryOn(); + if (StringUtils.isEmpty(retryOn)) { + return result -> false; + } + List conditions = Arrays.asList(retryOn.split(",")); + return (Throwable ex)->needRetry(ex, conditions); + } + private Predicate createExceptionPredicate(Class retryClass) { return (Throwable ex) -> { if (retryClass.isAssignableFrom(getRealExceptionClass(ex))) { @@ -120,4 +135,50 @@ public Predicate createResultPredicate(Retry retry, RetryRule rule) { } return result -> retry.needRetry(new HashSet<>(retryOnResponseStatus), result); } + + @Override + public Predicate createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy) { + String retryOn = xdsRetryPolicy.getRetryOn(); + if (StringUtils.isEmpty(retryOn)) { + return result -> false; + } + List conditions = Arrays.asList(retryOn.split(",")); + return result -> needRetry(retry, result, conditions); + } + + private boolean needRetry(Retry retry, Object result, List conditions) { + Optional statusCodeOptional = retry.getCode(result); + if (!statusCodeOptional.isPresent()) { + return false; + } + String statusCode = statusCodeOptional.get(); + if (conditions.contains(statusCode)) { + return true; + } + for (String conditionName : conditions) { + RetryCondition retryCondition = RetryConditionManager.getRetryCondition(conditionName); + if (retryCondition == null) { + continue; + } + boolean needRetry = retryCondition.needRetry(retry, null, statusCode, result); + if (needRetry) { + return true; + } + } + return false; + } + + private boolean needRetry(Throwable ex, List conditions) { + for (String conditionName : conditions) { + RetryCondition retryCondition = RetryConditionManager.getRetryCondition(conditionName); + if (retryCondition == null) { + continue; + } + boolean needRetry = retryCondition.needRetry(null, ex, null, null); + if (needRetry) { + return true; + } + } + return false; + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java index de7268ba2a..c86c33d02e 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryHandlerV2.java @@ -21,10 +21,13 @@ import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryRegistry; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.flowcontrol.common.core.resolver.RetryResolver; import io.sermant.flowcontrol.common.core.rule.RetryRule; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; import io.sermant.flowcontrol.common.handler.AbstractRequestHandler; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.xds.handler.XdsFlowControlHandler; import io.sermant.flowcontrol.retry.FeignRequestInterceptor.FeignRetry; import io.sermant.flowcontrol.retry.HttpRequestInterceptor.HttpRetry; @@ -39,6 +42,31 @@ public class RetryHandlerV2 extends AbstractRequestHandler { private final RetryPredicateCreator retryPredicateCreator = new DefaultRetryPredicateCreator(); + @Override + public Optional createProcessor(FlowControlScenario flowControlScenario, String businessName) { + final io.sermant.flowcontrol.common.handler.retry.Retry retry = RetryContext.INSTANCE.getRetry(); + if (retry == null) { + return Optional.empty(); + } + Optional retryPolicyOptional = XdsFlowControlHandler.INSTANCE + .getRetryPolicy(flowControlScenario.getServiceName(), flowControlScenario.getRouteName()); + if (!retryPolicyOptional.isPresent()) { + return Optional.empty(); + } + XdsRetryPolicy retryPolicy = retryPolicyOptional.get(); + if (retryPolicy.getPerTryTimeout() <= 0) { + return Optional.empty(); + } + final RetryConfig retryConfig = RetryConfig.custom() + .maxAttempts(getMaxAttempts(retry, (int)retryPolicy.getMaxAttempts())) + .retryOnResult(retryPredicateCreator.createResultPredicate(retry, retryPolicy)) + .retryOnException(retryPredicateCreator.createExceptionPredicate(retry.retryExceptions(), retryPolicy)) + .intervalFunction(IntervalFunction.of(retryPolicy.getPerTryTimeout())) + .failAfterMaxAttempts(false) + .build(); + return Optional.of(RetryRegistry.of(retryConfig).retry(businessName)); + } + @Override protected Optional createProcessor(String businessName, RetryRule rule) { final io.sermant.flowcontrol.common.handler.retry.Retry retry = RetryContext.INSTANCE.getRetry(); @@ -46,7 +74,7 @@ protected Optional createProcessor(String businessName, RetryRule rule) { return Optional.empty(); } final RetryConfig retryConfig = RetryConfig.custom() - .maxAttempts(getMaxAttempts(retry, rule)) + .maxAttempts(getMaxAttempts(retry, rule.getMaxAttempts())) .retryOnResult(retryPredicateCreator.createResultPredicate(retry, rule)) .retryOnException(retryPredicateCreator.createExceptionPredicate(retry.retryExceptions())) .intervalFunction(getIntervalFunction(rule)) @@ -61,14 +89,14 @@ protected Optional createProcessor(String businessName, RetryRule rule) { * based approach is{@link FeignRetry}, others are injection * * @param retry retry type - * @param rule rule + * @param maxAttempts maximum retry * @return maximum retry */ - private int getMaxAttempts(io.sermant.flowcontrol.common.handler.retry.Retry retry, RetryRule rule) { + private int getMaxAttempts(io.sermant.flowcontrol.common.handler.retry.Retry retry, int maxAttempts) { if (retry instanceof FeignRetry || retry instanceof HttpRetry) { - return rule.getMaxAttempts(); + return maxAttempts; } - return rule.getMaxAttempts() + 1; + return maxAttempts + 1; } @Override diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java index 239641bd4b..97f5705921 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/handler/RetryPredicateCreator.java @@ -17,6 +17,7 @@ package io.sermant.flowcontrol.retry.handler; +import io.sermant.core.service.xds.entity.XdsRetryPolicy; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.handler.retry.Retry; @@ -37,6 +38,15 @@ public interface RetryPredicateCreator { */ Predicate createExceptionPredicate(Class[] retryExceptions); + /** + * Create exception Predicate + * + * @param retryExceptions retry exception set + * @param policy retry rule + * @return Predicate + */ + Predicate createExceptionPredicate(Class[] retryExceptions, XdsRetryPolicy policy); + /** * create retry result predicate * @@ -45,4 +55,13 @@ public interface RetryPredicateCreator { * @return Predicate */ Predicate createResultPredicate(Retry retry, RetryRule rule); + + /** + * create retry result predicate + * + * @param retry retry + * @param xdsRetryPolicy retry rule + * @return Predicate + */ + Predicate createResultPredicate(Retry retry, XdsRetryPolicy xdsRetryPolicy); } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java index 0f7c71ec0b..def6b8083e 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/service/InterceptorSupporter.java @@ -24,6 +24,7 @@ import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.plugin.service.PluginServiceManager; import io.sermant.flowcontrol.common.config.FlowControlConfig; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.enums.FlowFramework; import io.sermant.flowcontrol.common.exception.InvokerWrapperException; import io.sermant.flowcontrol.common.handler.retry.RetryContext; @@ -78,6 +79,8 @@ public abstract class InterceptorSupporter extends ReflectMethodCacheSupport imp */ protected final FlowControlConfig flowControlConfig; + protected final XdsFlowControlConfig xdsFlowControlConfig; + private final ReentrantLock lock = new ReentrantLock(); private RetryHandlerV2 retryHandler = null; @@ -91,6 +94,7 @@ public abstract class InterceptorSupporter extends ReflectMethodCacheSupport imp */ protected InterceptorSupporter() { flowControlConfig = PluginConfigManager.getPluginConfig(FlowControlConfig.class); + xdsFlowControlConfig = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); } /** diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer index 49385b8e05..6e0dccf651 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -31,3 +31,13 @@ io.sermant.flowcontrol.retry.HttpRequestDeclarer io.sermant.flowcontrol.retry.SpringRibbonLbDeclarer io.sermant.flowcontrol.retry.SpringLbDeclarer io.sermant.flowcontrol.ClusterDeclarer +io.sermant.flowcontrol.retry.OkHttpSendHeaderDeclarer +io.sermant.flowcontrol.retry.HttpClientConnectionSendHeaderDeclarer +#HttpClient +io.sermant.flowcontrol.retry.client.OkHttpClientInterceptorChainDeclarer +io.sermant.flowcontrol.retry.client.HttpAsyncClient4xDeclarer +io.sermant.flowcontrol.retry.client.HttpClient4xDeclarer +io.sermant.flowcontrol.retry.client.HttpUrlConnectionConnectDeclarer +io.sermant.flowcontrol.retry.client.HttpUrlConnectionDisconnectDeclarer +io.sermant.flowcontrol.retry.client.OkHttp3ClientDeclarer +io.sermant.flowcontrol.retry.client.HttpUrlConnectionResponseStreamDeclarer \ No newline at end of file diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig index c9e6fc1731..f2fc25afdd 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig @@ -1 +1,2 @@ io.sermant.flowcontrol.common.config.FlowControlConfig +io.sermant.flowcontrol.common.config.XdsFlowControlConfig diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java index 6b4d598daa..35f7816571 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringLbChooseServerInterceptorTest.java @@ -21,13 +21,19 @@ import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.plugin.agent.interceptor.Interceptor; +import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.TestHelper; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; /** * spring load test @@ -38,6 +44,20 @@ public class SpringLbChooseServerInterceptorTest { private final Object server = new Object(); + private MockedStatic pluginConfigManagerMockedStatic; + + /** + * pre initialization + */ + @Before + public void before() throws Exception { + XdsFlowControlConfig xdsFlowControlConfig = new XdsFlowControlConfig(); + xdsFlowControlConfig.setEnable(true); + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(()->PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(xdsFlowControlConfig); + } + @Test public void testBefore() throws Exception { final Interceptor interceptor = getInterceptor(); @@ -49,7 +69,7 @@ public void testBefore() throws Exception { final Object instance = new Object(); // simulated update instance - RetryContext.INSTANCE.updateServiceInstance(instance); + RetryContext.INSTANCE.updateRetriedServiceInstance(instance); interceptor.before(context); assertNull(context.getResult()); RetryContext.INSTANCE.remove(); @@ -63,7 +83,7 @@ public void testAfter() { RetryContext.INSTANCE.buildRetryPolicy(retryRule); ReflectUtils.invokeMethod(interceptor, "updateServiceInstance", new Class[]{Object.class}, new Object[]{new TestResult()}); - Assert.assertEquals(RetryContext.INSTANCE.getRetryPolicy().getLastRetryServer(), server); + Assert.assertEquals(RetryContext.INSTANCE.getRetryPolicy().getAllRetriedInstance().get(0), server); RetryContext.INSTANCE.remove(); } @@ -81,4 +101,9 @@ private Object getServer() { protected Interceptor getInterceptor() { return new SpringLbChooseServerInterceptor(); } + + @After + public void after() { + pluginConfigManagerMockedStatic.close(); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java index 3fa5fb86ef..ce293cca96 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/SpringRibbonChooseServerInterceptorTest.java @@ -17,21 +17,29 @@ package io.sermant.flowcontrol.retry; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import com.google.common.base.Optional; import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.plugin.agent.interceptor.Interceptor; +import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.utils.ReflectUtils; import io.sermant.flowcontrol.TestHelper; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; import io.sermant.flowcontrol.common.core.rule.RetryRule; import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; /** * ribbon test @@ -42,6 +50,20 @@ public class SpringRibbonChooseServerInterceptorTest { private final Object server = new Object(); + private MockedStatic pluginConfigManagerMockedStatic; + + /** + * pre initialization + */ + @Before + public void before() throws Exception { + XdsFlowControlConfig xdsFlowControlConfig = new XdsFlowControlConfig(); + xdsFlowControlConfig.setEnable(true); + pluginConfigManagerMockedStatic = Mockito.mockStatic(PluginConfigManager.class); + pluginConfigManagerMockedStatic.when(()->PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class)) + .thenReturn(xdsFlowControlConfig); + } + @Test public void testBefore() throws Exception { final Interceptor interceptor = getInterceptor(); @@ -53,10 +75,14 @@ public void testBefore() throws Exception { final Object instance = new Object(); // simulated update instance - RetryContext.INSTANCE.updateServiceInstance(instance); + RetryContext.INSTANCE.updateRetriedServiceInstance(instance); + List instances = new ArrayList<>(); + instances.add(instance); + instances.add(new Object()); + context.changeArgs(new Object[]{instances}); interceptor.before(context); - assertTrue(context.getResult() instanceof Optional && ((Optional) context.getResult()).isPresent()); - assertEquals(((Optional) context.getResult()).get(), instance); + instances = (List) context.getArguments()[0]; + assertFalse(instances.contains(instance)); RetryContext.INSTANCE.remove(); } @@ -68,13 +94,18 @@ public void testAfter() throws NoSuchMethodException { RetryContext.INSTANCE.buildRetryPolicy(retryRule); final ExecuteContext context = TestHelper.buildDefaultContext(); context.changeResult(Optional.of(server)); - ReflectUtils.invokeMethod(interceptor, "updateServiceInstance", new Class[]{ExecuteContext.class}, - new Object[]{context}); - Assert.assertEquals(RetryContext.INSTANCE.getRetryPolicy().getLastRetryServer(), server); + ReflectUtils.invokeMethod(interceptor, "updateServiceInstance", new Class[]{Object.class}, + new Object[]{context.getResult()}); + Assert.assertEquals(RetryContext.INSTANCE.getRetryPolicy().getAllRetriedInstance().get(0), server); RetryContext.INSTANCE.remove(); } private Interceptor getInterceptor() { return new SpringRibbonChooseServerInterceptor(); } + + @After + public void after() { + pluginConfigManagerMockedStatic.close(); + } } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java index e5e9631b3b..d11e04a484 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/test/java/io/sermant/flowcontrol/retry/handler/DefaultRetryPredicateCreatorTest.java @@ -76,5 +76,15 @@ public Class[] retryExceptions() { public RetryFramework retryType() { return RetryFramework.SPRING_CLOUD; } + + @Override + public Optional getCode(Object result) { + return Optional.empty(); + } + + @Override + public Optional> getHeaderNames(Object result) { + return Optional.empty(); + } } }