Skip to content

Commit

Permalink
Add XDS retry and circuit-breaking functionality to the flow control …
Browse files Browse the repository at this point in the history
…plugin

Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Dec 21, 2024
1 parent 723b3e4 commit 91c6a96
Show file tree
Hide file tree
Showing 28 changed files with 1,847 additions and 105 deletions.
35 changes: 35 additions & 0 deletions sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<netflix-core.version>1.4.7.RELEASE</netflix-core.version>
<spring.cloud.context.version>2.2.0.RELEASE</spring.cloud.context.version>
<google.guava>31.1-jre</google.guava>
<ribbon.version>2.2.5</ribbon.version>
<apache-httpclient.version>4.5.13</apache-httpclient.version>
<http.client.async.verion>4.1.5</http.client.async.verion>
<okhttp.version>4.11.0</okhttp.version>
<okhttp.sq.version>2.7.5</okhttp.sq.version>
</properties>
<dependencies>
<!--compile-->
Expand Down Expand Up @@ -99,6 +104,12 @@
<version>${netflix-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-loadbalancer</artifactId>
<version>${ribbon.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
Expand All @@ -111,6 +122,24 @@
<version>${google.guava}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${apache-httpclient.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.sq.version}</version>
<scope>provided</scope>
</dependency>
<!--test-->
<dependency>
<groupId>junit</groupId>
Expand All @@ -136,6 +165,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>${http.client.async.verion}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.flowcontrol.common.config.XdsFlowControlConfig;

/**
* okhttp request declarer
*
* @author zhp
* @since 2024-11-30
*/
public abstract class AbstractXdsDeclarer extends AbstractPluginDeclarer {
private final XdsFlowControlConfig config = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class);

@Override
public boolean isEnabled() {
return config.isEnable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers;
import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.handler.retry.RetryContext;
import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy;
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import io.sermant.flowcontrol.common.xds.circuit.CircuitBreakerManager;
import io.sermant.flowcontrol.common.xds.handler.XdsFlowControlHandler;
import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancer;
import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancerFactory;
import io.sermant.flowcontrol.service.InterceptorSupporter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* Enhance the client request sending functionality by performing Xds service instance discovery and circuit breaking
* during the request sending process
*
* @author zhp
* @since 2024-11-30
*/
public abstract class AbstractXdsHttpClientInterceptor extends InterceptorSupporter {
protected static final String MESSAGE = "CircuitBreaker has forced open and deny any requests";

private static final int MIN_SUCCESS_CODE = 200;

private static final int MAX_SUCCESS_CODE = 399;

private static final int HUNDRED = 100;

/**
* Perform circuit breaker judgment and handling
*
* @return The result of whether circuit breaking is needed
*/
public boolean needsCircuitBreak() {
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName())
|| StringUtils.isEmpty(scenarioInfo.getServiceName())
|| StringUtils.isEmpty(scenarioInfo.getAddress())) {
return false;
}
Optional<XdsRequestCircuitBreakers> circuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
getRequestCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
if (!circuitBreakersOptional.isPresent()) {
return false;
}
int activeRequestNum = CircuitBreakerManager.incrementActiveRequests(scenarioInfo.getServiceName(),
scenarioInfo.getClusterName(), scenarioInfo.getAddress());
int maxRequest = circuitBreakersOptional.get().getMaxRequests();
return maxRequest != 0 && activeRequestNum > maxRequest;
}

@Override
public ExecuteContext doAfter(ExecuteContext context) {
XdsThreadLocalUtil.removeSendByteFlag();
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
if (context.getThrowable() != null || scenarioInfo == null) {
return context;
}
decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo);
return context;
}

private void decrementActiveRequestsAndCountFailureRequests(ExecuteContext context,
FlowControlScenario scenarioInfo) {
CircuitBreakerManager.decrementActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getServiceName(),
scenarioInfo.getAddress());
int statusCode = getStatusCode(context);
if (statusCode >= MIN_SUCCESS_CODE && statusCode <= MAX_SUCCESS_CODE) {
return;
}
countFailedRequests(scenarioInfo, statusCode);
}

@Override
public ExecuteContext doThrow(ExecuteContext context) {
XdsThreadLocalUtil.removeSendByteFlag();
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
if (scenarioInfo == null) {
return context;
}
decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo);
return context;
}

/**
* handler failure request
*
* @param statusCode response code
* @param scenario scenario information
*/
private void countFailedRequests(FlowControlScenario scenario, int statusCode) {
CircuitBreakerManager.decrementActiveRequests(scenario.getServiceName(), scenario.getClusterName(),
scenario.getAddress());
Optional<XdsInstanceCircuitBreakers> instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
getInstanceCircuitBreakers(scenario.getServiceName(), scenario.getClusterName());
if (!instanceCircuitBreakersOptional.isPresent()) {
return;
}
XdsInstanceCircuitBreakers circuitBreakers = instanceCircuitBreakersOptional.get();
CircuitBreakerManager.countFailureRequest(scenario, scenario.getAddress(), statusCode, circuitBreakers);
}

/**
* Get status code
*
* @param context The execution context of the Interceptor
* @return response code
*/
protected abstract int getStatusCode(ExecuteContext context);

/**
* choose serviceInstance by xds rule
*
* @return result
*/
protected Optional<ServiceInstance> chooseServiceInstanceForXds() {
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
if (scenarioInfo == null || io.sermant.core.utils.StringUtils.isBlank(scenarioInfo.getServiceName())
|| io.sermant.core.utils.StringUtils.isEmpty(scenarioInfo.getClusterName())) {
return Optional.empty();
}
Set<ServiceInstance> serviceInstanceSet = XdsFlowControlHandler.INSTANCE.
getAllServerInstance(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
if (serviceInstanceSet.isEmpty()) {
return Optional.empty();
}
boolean needRetry = RetryContext.INSTANCE.isPolicyNeedRetry();
if (needRetry) {
removeRetriedServiceInstance(serviceInstanceSet);
}
removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet);
return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo));
}

private void removeRetriedServiceInstance(Set<ServiceInstance> serviceInstanceSet) {
RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy();
retryPolicy.retryMark();
List<Object> retriedInstance = retryPolicy.getAllRetriedInstance();
Set<ServiceInstance> allInstance = new HashSet<>(serviceInstanceSet);
for (Object retryInstance : retriedInstance) {
if (retryInstance instanceof ServiceInstance) {
serviceInstanceSet.remove(retryInstance);
}
}
if (CollectionUtils.isEmpty(serviceInstanceSet)) {
serviceInstanceSet.addAll(allInstance);
}
}

private ServiceInstance chooseServiceInstanceByLoadBalancer(Set<ServiceInstance> instanceSet,
FlowControlScenario scenarioInfo) {
XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory.getLoadBalancer(scenarioInfo.getServiceName(),
scenarioInfo.getClusterName());
return loadBalancer.selectInstance(new ArrayList<>(instanceSet));
}

private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set<ServiceInstance> instanceSet) {
Optional<XdsInstanceCircuitBreakers> instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
getInstanceCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
if (!instanceCircuitBreakersOptional.isPresent()) {
return;
}
XdsInstanceCircuitBreakers outlierDetection = instanceCircuitBreakersOptional.get();
int count = instanceSet.size();
if (checkMinInstanceNum(outlierDetection, count)) {
return;
}
List<ServiceInstance> circuitBreakerInstances = new ArrayList<>();
float maxCircuitBreakerPercent = (float) outlierDetection.getMaxEjectionPercent() / HUNDRED;
int maxCircuitBreakerInstances = (int) Math.floor(count * maxCircuitBreakerPercent);
for (ServiceInstance serviceInstance : instanceSet) {
if (hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) {
break;
}
String address = serviceInstance.getHost() + CommonConst.CONNECT + serviceInstance.getPort();
if (CircuitBreakerManager.needCircuitBreaker(scenarioInfo, address, outlierDetection)) {
circuitBreakerInstances.add(serviceInstance);
}
}
if (checkHealthInstanceNum(count, outlierDetection, circuitBreakerInstances.size())) {
return;
}
circuitBreakerInstances.forEach(instanceSet::remove);
}

private boolean hasReachedCircuitBreakerThreshold(List<ServiceInstance> circuitBreakerInstances,
int maxCircuitBreakerInstances) {
return circuitBreakerInstances.size() >= maxCircuitBreakerInstances;
}

private boolean checkHealthInstanceNum(int count, XdsInstanceCircuitBreakers outlierDetection, int size) {
return count * outlierDetection.getMinHealthPercent() >= (count - size);
}

private boolean checkMinInstanceNum(XdsInstanceCircuitBreakers outlierDetection, int count) {
return outlierDetection.getFailurePercentageMinimumHosts() > count;
}

@Override
protected boolean canInvoke(ExecuteContext context) {
return true;
}

/**
* Get Retry Handler
*
* @return Retry Handlers
*/
protected List<io.github.resilience4j.retry.Retry> getRetryHandlers() {
if (XdsThreadLocalUtil.getScenarioInfo() != null) {
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
RetryContext.INSTANCE.buildXdsRetryPolicy(scenarioInfo);
return getRetryHandler().getXdsHandlers(scenarioInfo);
}
return Collections.emptyList();
}
}
Loading

0 comments on commit 91c6a96

Please sign in to comment.