Skip to content

Commit

Permalink
Optimize the performance of xds flow control functionality
Browse files Browse the repository at this point in the history
Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Jan 3, 2025
1 parent 3626542 commit 954e6c4
Show file tree
Hide file tree
Showing 37 changed files with 853 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ public class CommonConst {
*/
public static final int DEFAULT_RESPONSE_CODE = -1;

/**
* the key of Scenario information for flow control
*/
public static final String SCENARIO_INFO = "flowControlScenario";

/**
* the key of request-information
*/
public static final String REQUEST_INFO = "REQUEST_INFO";

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ private boolean isPathMatched(XdsPathMatcher matcher, String path) {
}

private boolean isHeadersMatched(List<XdsHeaderMatcher> matchers, Map<String, String> headers) {
return matchers.stream()
.allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers));
for (XdsHeaderMatcher xdsHeaderMatcher : matchers) {
if (!xdsHeaderMatcher.isMatch(headers)) {
return false;
}
}
return true;
}

private String selectClusterByRoute(XdsRoute matchedRoute) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package io.sermant.flowcontrol.common.handler;

import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.flowcontrol.common.core.ResolverManager;
import io.sermant.flowcontrol.common.core.match.MatchManager;
import io.sermant.flowcontrol.common.core.resolver.AbstractResolver;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.RequestEntity;
import io.sermant.flowcontrol.common.util.StringUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -41,6 +42,12 @@
* @since 2022-01-22
*/
public abstract class AbstractRequestHandler<H, R> {
/**
* XDS Handler cache, , where the Key of the first level is the service name,
* the Key of the second level is the route name, the Key of the three level is the xdsRetryPolicy
*/
private final Map<String, Map<String, Map<String, Optional<H>>>> xdsHandlers = new ConcurrentHashMap<>();

/**
* Handler cache
*/
Expand Down Expand Up @@ -74,11 +81,20 @@ public List<H> getHandlers(RequestEntity request) {
/**
* gets the specified request handler
*
* @param flowControlScenario matched scenario information
* @param scenario Scenario information for flow control
* @param xdsRetryPolicy retry policy information
* @return handler
*/
public List<H> getXdsHandlers(FlowControlScenario flowControlScenario) {
Optional<H> handlerOptions = createHandler(flowControlScenario, StringUtils.EMPTY);
public List<H> getXdsHandlers(FlowControlScenario scenario, XdsRetryPolicy xdsRetryPolicy) {
Map<String, Map<String, Optional<H>>> routeHandler = xdsHandlers.computeIfAbsent(scenario.getServiceName(),
k -> new HashMap<>());
Map<String, Optional<H>> retryHandlers = routeHandler.computeIfAbsent(scenario.getClusterName(),
k -> new HashMap<>());
String retryName = xdsRetryPolicy.toString();
Optional<H> handlerOptions = retryHandlers.computeIfAbsent(retryName, s -> {
retryHandlers.clear();
return createHandler(xdsRetryPolicy, retryName);
});
return handlerOptions.map(Collections::singletonList).orElse(Collections.emptyList());
}

Expand Down Expand Up @@ -117,11 +133,11 @@ private Optional<H> create(String businessName) {
/**
* create handler
*
* @param flowControlScenario matched business information
* @param xdsRetryPolicy retry policy information
* @param businessName service scenario name
* @return handler
*/
public Optional<H> createHandler(FlowControlScenario flowControlScenario, String businessName) {
public Optional<H> createHandler(XdsRetryPolicy xdsRetryPolicy, String businessName) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
* @throws UnsupportedOperationException unsupported operation
*/
public Optional<String> getCode(Object result) {
throw new UnsupportedOperationException();
return Optional.empty();
}

/**
Expand All @@ -141,7 +141,7 @@ public Optional<String> getCode(Object result) {
* @throws UnsupportedOperationException unsupported operation
*/
public Optional<Set<String>> getHeaderNames(Object result) {
throw new UnsupportedOperationException();
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ public interface Retry {
RetryFramework retryType();

/**
* get status code
* get status code
*
* @param result interface response result
* @return response status code
*/
Optional<String> getCode(Object result);

/**
* get header
* get header
*
* @param result interface response result
* @return response header names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@
import io.sermant.flowcontrol.common.core.RuleUtils;
import io.sermant.flowcontrol.common.core.resolver.RetryResolver;
import io.sermant.flowcontrol.common.core.rule.RetryRule;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.HttpRequestEntity;
import io.sermant.flowcontrol.common.handler.retry.policy.RetryOnUntriedPolicy;
import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy;
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.xds.handler.XdsHandler;

import java.util.List;
import java.util.Optional;

/**
* Retry context, used to manage retry policies based on different host framework types
Expand Down Expand Up @@ -141,21 +137,11 @@ public void buildRetryPolicy(HttpRequestEntity requestEntity) {
}

/**
* build test strategy
* build retry policy
*
* @param scenario scenario information
* @param retryPolicy retry policy information
*/
public void buildXdsRetryPolicy(FlowControlScenario scenario) {
if (StringUtils.isEmpty(scenario.getServiceName())
|| StringUtils.isEmpty(scenario.getRouteName())) {
return;
}
Optional<XdsRetryPolicy> retryPolicyOptional = XdsHandler.INSTANCE
.getRetryPolicy(scenario.getServiceName(), scenario.getRouteName());
if (!retryPolicyOptional.isPresent()) {
return;
}
XdsRetryPolicy retryPolicy = retryPolicyOptional.get();
public void buildXdsRetryPolicy(XdsRetryPolicy retryPolicy) {
policyThreadLocal.set(new RetryOnUntriedPolicy((int) retryPolicy.getMaxAttempts()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.entity.HttpRequestEntity;
import io.sermant.flowcontrol.common.entity.RequestEntity;

import io.sermant.flowcontrol.common.util.XdsAbstractTest;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers;
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -48,24 +49,57 @@ public void testActiveRequests() {
}

@Test
public void testCircuitBreaker() {
public void testCircuitBreaker() throws InterruptedException {
final FlowControlScenario scenarioInfo = new FlowControlScenario();
scenarioInfo.setServiceName(SERVICE_NAME);
scenarioInfo.setClusterName(CLUSTER_NAME);
scenarioInfo.setRouteName(ROUTE_NAME);
scenarioInfo.setAddress(ADDRESS);
final XdsInstanceCircuitBreakers circuitBreakers = new XdsInstanceCircuitBreakers();
circuitBreakers.setSplitExternalLocalOriginErrors(false);
circuitBreakers.setConsecutiveLocalOriginFailure(1);
circuitBreakers.setConsecutiveGatewayFailure(1);
circuitBreakers.setConsecutiveLocalOriginFailure(0);
circuitBreakers.setConsecutiveGatewayFailure(0);
circuitBreakers.setConsecutive5xxFailure(1);
circuitBreakers.setInterval(1000L);
circuitBreakers.setBaseEjectionTime(10000L);
boolean result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertFalse(result);

// Test the number of errors from the server reached the threshold
XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 500, circuitBreakers);
XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);

// Test whether the circuit breaker time has been exceeded and whether it has been restored
Thread.sleep(1100L);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertFalse(result);

// Test the number of errors from the local source reached the threshold
circuitBreakers.setSplitExternalLocalOriginErrors(true);
circuitBreakers.setConsecutiveLocalOriginFailure(1);
XdsThreadLocalUtil.setConnectionStatus(false);
XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, -1, circuitBreakers);
XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);

// Test whether the actual circuit breaker time is the product of the number of circuit breakers multiplied
// by the configured circuit breaker time
Thread.sleep(1100L);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);
Thread.sleep(1000L);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertFalse(result);

// Test the number of errors from the gateway reached the threshold
circuitBreakers.setConsecutive5xxFailure(0);
circuitBreakers.setConsecutiveGatewayFailure(1);
XdsCircuitBreakerManager.recordFailureRequest(scenarioInfo, ADDRESS, 503, circuitBreakers);
XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenarioInfo);
result = XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, ADDRESS);
assertTrue(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.common.xds.ratelimit;

import io.sermant.core.service.xds.entity.XdsTokenBucket;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* RateLimitManager Test
*
* @author zhp
* @since 2024-12-02
*/
public class XdsRateLimitManagerTest {
private static final String SERVICE_NAME = "serviceA";

private static final String ROUTE_NAME = "routeA";

@Test
public void testFillAndConsumeToken() throws InterruptedException {
final XdsTokenBucket tokenBucket = new XdsTokenBucket();
tokenBucket.setMaxTokens(1);
tokenBucket.setTokensPerFill(1);
tokenBucket.setFillInterval(1000L);
boolean result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertTrue(result);

// The situation where all tokens have been consumed
result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertFalse(result);
Thread.sleep(1000L);

// Test token refill situation
result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertTrue(result);

// Test all refilled tokens have been consumed
result = XdsRateLimitManager.fillAndConsumeToken(SERVICE_NAME, ROUTE_NAME, tokenBucket);
assertFalse(result);
}
}
Loading

0 comments on commit 954e6c4

Please sign in to comment.