diff --git a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/NetworkUtils.java b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/NetworkUtils.java index 6815bcf4ce..33d5c17a03 100644 --- a/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/NetworkUtils.java +++ b/sermant-agentcore/sermant-agentcore-core/src/main/java/io/sermant/core/utils/NetworkUtils.java @@ -80,6 +80,22 @@ public static List getAllNetworkIp() { return allNetworkIps; } + /** + * getKubernetesPodIp + * + * @return pod ip + */ + public static String getKubernetesPodIp() { + String podIp = ""; + try { + InetAddress ipAddress = InetAddress.getLocalHost(); + podIp = ipAddress.getHostAddress(); + } catch (UnknownHostException e) { + LOGGER.severe("Failed to get IP address of the pod: " + e.getMessage()); + } + return podIp; + } + private static void parseNetworkIp(List result, Enumeration nii) { InetAddress ip; while (nii.hasMoreElements()) { diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/utils/XdsRouterUtils.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/utils/XdsRouterUtils.java new file mode 100644 index 0000000000..c954f62f95 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/utils/XdsRouterUtils.java @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.utils; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.core.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * XdsRouterUtils + * + * @author daizhenyu + * @since 2024-08-29 + **/ +public class XdsRouterUtils { + private static XdsServiceDiscovery serviceDiscovery = ServiceManager.getService(XdsCoreService.class) + .getXdsServiceDiscovery(); + + /** + * the locality information of the host microservice itself + */ + private static XdsLocality selfServiceLocality; + + private XdsRouterUtils() { + } + + /** + * get XdsLocality of self-service + * + * @return XdsLocality + */ + public static Optional getLocalityInfoOfSelfService() { + if (selfServiceLocality != null) { + return Optional.of(selfServiceLocality); + } + synchronized (XdsRouterUtils.class) { + if (selfServiceLocality != null) { + return Optional.of(selfServiceLocality); + } + String podIp = NetworkUtils.getKubernetesPodIp(); + if (StringUtils.isEmpty(podIp)) { + return Optional.empty(); + } + Set serviceInstances = serviceDiscovery + .getServiceInstance(ConfigManager.getConfig(ServiceMeta.class).getService()); + Optional serviceInstance = getMatchedServiceInstanceByPodIp(serviceInstances, podIp); + if (!serviceInstance.isPresent()) { + return Optional.empty(); + } + Optional validXdsLocality = createValidXdsLocality(serviceInstance.get().getMetaData()); + selfServiceLocality = validXdsLocality.orElse(null); + return validXdsLocality; + } + } + + private static Optional getMatchedServiceInstanceByPodIp(Set serviceInstances, + String podIp) { + return serviceInstances.stream() + .filter(serviceInstance -> podIp.equals(serviceInstance.getHost())) + .findFirst(); + } + + private static Optional createValidXdsLocality(Map metaData) { + XdsLocality locality = new XdsLocality(); + String region = metaData.get("region"); + String zone = metaData.get("zone"); + String subZone = metaData.get("sub_zone"); + if (StringUtils.isEmpty(region) || (StringUtils.isEmpty(zone) && !StringUtils.isEmpty(subZone))) { + return Optional.empty(); + } + locality.setRegion(region); + locality.setZone(zone); + locality.setSubZone(subZone); + return Optional.of(locality); + } +} diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/XdsRouterHandler.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/XdsRouterHandler.java new file mode 100644 index 0000000000..eb5dbdffe4 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/XdsRouterHandler.java @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsHeaderMatcher; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsClusterWeight; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsWeightedClusters; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.core.utils.StringUtils; +import io.sermant.router.common.utils.XdsRouterUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * XdsRouterHandler, filter service instances based on xDS routing rules + * + * @author daizhenyu + * @since 2024-08-29 + **/ +public enum XdsRouterHandler { + /** + * singleton + */ + INSTANCE; + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private final Random random = new Random(); + + private XdsRouteService routeService; + + private XdsServiceDiscovery serviceDiscovery; + + /** + * constructor + */ + XdsRouterHandler() { + XdsCoreService xdsCoreService = ServiceManager.getService(XdsCoreService.class); + if (xdsCoreService != null) { + routeService = xdsCoreService.getXdsRouteService(); + serviceDiscovery = xdsCoreService.getXdsServiceDiscovery(); + } + } + + /** + * getServiceInstanceByXdsRoute + * + * @param serviceName service name + * @param path request path + * @return serviceInstance + */ + public Set getServiceInstanceByXdsRoute(String serviceName, String path) { + return getMatchedServiceInstance(serviceName, path, null, MatchType.PATH); + } + + /** + * getServiceInstanceByXdsRoute + * + * @param serviceName service name + * @param headers request headers + * @return serviceInstance + */ + public Set getServiceInstanceByXdsRoute(String serviceName, Map headers) { + return getMatchedServiceInstance(serviceName, null, headers, MatchType.HEADER); + } + + /** + * getServiceInstanceByXdsRoute + * + * @param serviceName service name + * @param path request path + * @param headers request headers + * @return serviceInstance + */ + public Set getServiceInstanceByXdsRoute(String serviceName, String path, + Map headers) { + return getMatchedServiceInstance(serviceName, path, headers, MatchType.BOTH); + } + + private Set getMatchedServiceInstance(String serviceName, String path, + Map headers, MatchType matchType) { + if (routeService == null || serviceDiscovery == null) { + LOGGER.severe("xDS service not open for xDS routing."); + return Collections.EMPTY_SET; + } + List routes = routeService.getServiceRoute(serviceName); + XdsRoute matchedRoute = null; + boolean pathMatched = matchType == MatchType.PATH || matchType == MatchType.BOTH; + boolean headerMatched = matchType == MatchType.HEADER || matchType == MatchType.BOTH; + for (XdsRoute route : routes) { + XdsRouteMatch routeMatch = route.getRouteMatch(); + + // check path matching + if (pathMatched && !isPathMatched(routeMatch.getPathMatcher(), path)) { + continue; + } + + // check head matching + if (headerMatched && !isHeadersMatched(routeMatch.getHeaderMatchers(), headers)) { + continue; + } + matchedRoute = route; + break; + } + + if (matchedRoute == null) { + return serviceDiscovery.getServiceInstance(serviceName); + } + return handleXdsRoute(matchedRoute, serviceName); + } + + private Set handleXdsRoute(XdsRoute route, String serviceName) { + // select cluster + XdsRouteAction routeAction = route.getRouteAction(); + String cluster = routeAction.getCluster(); + if (routeAction.isWeighted()) { + cluster = selectClusterByWeight(routeAction.getWeightedClusters()); + } + + // get service instance of cluster + Optional loadAssigmentOptional = + serviceDiscovery.getClusterServiceInstance(cluster); + if (!loadAssigmentOptional.isPresent()) { + return serviceDiscovery.getServiceInstance(serviceName); + } + XdsClusterLoadAssigment clusterLoadAssigment = loadAssigmentOptional.get(); + + if (!routeService.isLocalityRoute(clusterLoadAssigment.getClusterName())) { + Set serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment); + return serviceInstances.isEmpty() ? serviceDiscovery.getServiceInstance(serviceName) : serviceInstances; + } + + // get locality info of self-service and route by locality + Optional localityInfoOfSelfService = XdsRouterUtils.getLocalityInfoOfSelfService(); + if (localityInfoOfSelfService.isPresent()) { + Set serviceInstances = getServiceInstanceOfLocalityCluster(clusterLoadAssigment, + localityInfoOfSelfService.get()); + if (!serviceInstances.isEmpty()) { + return serviceInstances; + } + } + + Set serviceInstances = getServiceInstanceOfCluster(clusterLoadAssigment); + return serviceInstances.isEmpty() ? serviceDiscovery.getServiceInstance(serviceName) : serviceInstances; + } + + private Set getServiceInstanceOfLocalityCluster(XdsClusterLoadAssigment clusterLoadAssigment, + XdsLocality locality) { + return clusterLoadAssigment.getLocalityInstances().entrySet().stream() + .filter(xdsLocalitySetEntry -> xdsLocalitySetEntry.getKey().equals(locality)) + .flatMap(xdsLocalitySetEntry -> xdsLocalitySetEntry.getValue().stream()) + .collect(Collectors.toSet()); + } + + private Set getServiceInstanceOfCluster(XdsClusterLoadAssigment clusterLoadAssigment) { + return clusterLoadAssigment.getLocalityInstances().entrySet().stream() + .flatMap(instanceEntry -> instanceEntry.getValue().stream()) + .collect(Collectors.toSet()); + } + + private boolean isPathMatched(XdsPathMatcher matcher, String path) { + return matcher.isMatch(path); + } + + private boolean isHeadersMatched(List matchers, Map headers) { + return matchers.stream() + .allMatch(xdsHeaderMatcher -> xdsHeaderMatcher.isMatch(headers)); + } + + private String selectClusterByWeight(XdsWeightedClusters weightedClusters) { + List clusters = weightedClusters.getClusters(); + int totalWeight = weightedClusters.getTotalWeight(); + if (CollectionUtils.isEmpty(clusters) || totalWeight == 0) { + return StringUtils.EMPTY; + } + int randomWeight = random.nextInt(totalWeight); + + int currentWeight = 0; + for (XdsClusterWeight clusterWeight : clusters) { + currentWeight += clusterWeight.getWeight(); + if (randomWeight < currentWeight) { + return clusterWeight.getClusterName(); + } + } + return StringUtils.EMPTY; + } + + /** + * route MatchType + * + * @author daizhenyu + * @since 2024-08-29 + **/ + private enum MatchType { + /** + * path match + */ + PATH, + /** + * header match + */ + HEADER, + /** + * path and header match + */ + BOTH + } +} diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsLoadBalancer.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsLoadBalancer.java new file mode 100644 index 0000000000..dc1ce46e58 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsLoadBalancer.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; + +/** + * XdsLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public interface XdsLoadBalancer { + /** + * select instance by loadbalancer + * + * @param instances service instance + * @return selected instance + */ + ServiceInstance selectInstance(List instances); +} diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsLoadBalancerFactory.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsLoadBalancerFactory.java new file mode 100644 index 0000000000..26f3de7d95 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsLoadBalancerFactory.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.entity.XdsLbPolicy; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +/** + * XdsLoadBalancerFactory + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsLoadBalancerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final Map LOAD_BALANCERS = new ConcurrentHashMap<>(); + + private static final String RANDOM = "RANDOM"; + + private static XdsLoadBalanceService loadBalanceService; + + static { + XdsCoreService coreService = ServiceManager.getService(XdsCoreService.class); + if (coreService != null) { + loadBalanceService = coreService.getLoadBalanceService(); + } else { + LOGGER.severe("xDS service not open for xDS routing."); + } + } + + private XdsLoadBalancerFactory() { + } + + /** + * getRoundRobinLoadBalancer + * + * @param clusterName cluster name + * @return XdsLoadBalancer + */ + private static XdsLoadBalancer getRoundRobinLoadBalancer(String clusterName) { + return LOAD_BALANCERS.computeIfAbsent(clusterName, key -> new XdsRoundRobinLoadBalancer()); + } + + /** + * getRandomLoadBalancer + * + * @return XdsLoadBalancer + */ + private static XdsLoadBalancer getRandomLoadBalancer() { + return LOAD_BALANCERS.computeIfAbsent(RANDOM, key -> new XdsRandomLoadBalancer()); + } + + /** + * getLoadBalancer + * + * @param clusterName cluster name + * @return XdsLoadBalancer + */ + public static XdsLoadBalancer getLoadBalancer(String clusterName) { + if (loadBalanceService == null) { + LOGGER.severe("xDS service not open for xDS routing."); + return getRoundRobinLoadBalancer(clusterName); + } + XdsLbPolicy lbPolicy = loadBalanceService.getLbPolicyOfCluster(clusterName); + switch (lbPolicy) { + case RANDOM: + return getRandomLoadBalancer(); + default: + return getRoundRobinLoadBalancer(clusterName); + } + } +} diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsRandomLoadBalancer.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsRandomLoadBalancer.java new file mode 100644 index 0000000000..4c39e35b82 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsRandomLoadBalancer.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; +import java.util.Random; + +/** + * XdsRandomLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRandomLoadBalancer implements XdsLoadBalancer { + private final Random random; + + /** + * Constructor + */ + public XdsRandomLoadBalancer() { + this.random = new Random(); + } + + @Override + public ServiceInstance selectInstance(List instances) { + // Select a random index from the list + int index = random.nextInt(instances.size()); + + // Return the randomly selected instance + return instances.get(index); + } +} diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsRoundRobinLoadBalancer.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsRoundRobinLoadBalancer.java new file mode 100644 index 0000000000..2ccbeff464 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/xds/lb/XdsRoundRobinLoadBalancer.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * XdsRoundRobinLoadBalancer + * + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRoundRobinLoadBalancer implements XdsLoadBalancer { + private final AtomicInteger index; + + /** + * constructor + */ + public XdsRoundRobinLoadBalancer() { + this.index = new AtomicInteger(0); + } + + @Override + public ServiceInstance selectInstance(List instances) { + synchronized (this.getClass()) { + // safely calculate the index based on the current size of the instances list + int currentIndex = index.getAndUpdate(i -> (i + 1) % instances.size()); + + // double-check size to avoid index out of bounds + if (currentIndex >= instances.size()) { + currentIndex = 0; + index.set(1); + } + + // return the instance at the current index + return instances.get(currentIndex); + } + } +} diff --git a/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/utils/XdsRouterUtilTest.java b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/utils/XdsRouterUtilTest.java new file mode 100644 index 0000000000..cb8c74748b --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/utils/XdsRouterUtilTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.utils; + +import io.sermant.core.config.ConfigManager; +import io.sermant.core.plugin.config.ServiceMeta; +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.utils.NetworkUtils; +import io.sermant.router.common.xds.TestServiceInstance; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * XdsRouterUtilTest + * + * @author daizhenyu + * @since 2024-08-31 + **/ +public class XdsRouterUtilTest { + private static XdsServiceDiscovery serviceDiscovery; + + private static MockedStatic serviceManager; + + private static MockedStatic networkUtils; + + private static MockedStatic configManager; + + @BeforeClass + public static void setUp() throws Exception { + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + serviceManager = Mockito.mockStatic(ServiceManager.class); + Mockito.when(ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + networkUtils = Mockito.mockStatic(NetworkUtils.class); + Mockito.when(NetworkUtils.getKubernetesPodIp()).thenReturn("127.0.0.1"); + + serviceDiscovery = Mockito.mock(XdsServiceDiscovery.class); + Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery); + Mockito.when(serviceDiscovery.getServiceInstance("serviceA")) + .thenReturn(createServiceInstance4Service(Arrays.asList("127.0.0.1", "host", "localhost"))); + + configManager = Mockito.mockStatic(ConfigManager.class); + } + + @AfterClass + public static void tearDown() throws Exception { + serviceManager.close(); + networkUtils.close(); + configManager.close(); + } + + @Test + public void testGetLocalityInfoOfSelfService() { + // not find matched service instance + ServiceMeta meta = new ServiceMeta(); + meta.setService("serviceB"); + Mockito.when(ConfigManager.getConfig(ServiceMeta.class)).thenReturn(meta); + Optional localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService(); + Assert.assertFalse(localityInfo.isPresent()); + + // find matched service instance + meta.setService("serviceA"); + localityInfo = XdsRouterUtils.getLocalityInfoOfSelfService(); + Assert.assertTrue(localityInfo.isPresent()); + Assert.assertEquals("127.0.0.1", localityInfo.get().getRegion()); + } + + private static Set createServiceInstance4Service(List hosts) { + Set serviceInstances = new HashSet<>(); + for (String host : hosts) { + TestServiceInstance serviceInstance = new TestServiceInstance(); + serviceInstance.setHost(host); + Map metaData = new HashMap<>(); + metaData.put("region", host); + serviceInstance.setMetaData(metaData); + serviceInstances.add(serviceInstance); + } + return serviceInstances; + } +} \ No newline at end of file diff --git a/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/TestServiceInstance.java b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/TestServiceInstance.java new file mode 100644 index 0000000000..7c30c8fe2f --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/TestServiceInstance.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds; + +import io.sermant.core.service.xds.entity.ServiceInstance; + +import java.util.Map; + +/** + * TestServiceInstance + * + * @author daizhenyu + * @since 2024-08-24 + **/ +public class TestServiceInstance implements ServiceInstance { + public TestServiceInstance() { + } + + private String cluster; + + private String service; + + private String host; + + private int port; + + private Map metaData; + + private boolean healthy; + + @Override + public String getClusterName() { + return cluster; + } + + @Override + public String getServiceName() { + return service; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + public Map getMetaData() { + return metaData; + } + + @Override + public boolean isHealthy() { + return healthy; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public void setService(String service) { + this.service = service; + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } + + public void setMetaData(Map metaData) { + this.metaData = metaData; + } + + public void setHealthy(boolean healthy) { + this.healthy = healthy; + } +} diff --git a/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/XdsRouterHandlerTest.java b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/XdsRouterHandlerTest.java new file mode 100644 index 0000000000..c77482c98b --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/XdsRouterHandlerTest.java @@ -0,0 +1,234 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds; + +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsRouteService; +import io.sermant.core.service.xds.XdsServiceDiscovery; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment; +import io.sermant.core.service.xds.entity.XdsHeaderMatcher; +import io.sermant.core.service.xds.entity.XdsLocality; +import io.sermant.core.service.xds.entity.XdsPathMatcher; +import io.sermant.core.service.xds.entity.XdsRoute; +import io.sermant.core.service.xds.entity.XdsRouteAction; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsClusterWeight; +import io.sermant.core.service.xds.entity.XdsRouteAction.XdsWeightedClusters; +import io.sermant.core.service.xds.entity.XdsRouteMatch; +import io.sermant.core.service.xds.entity.match.ExactMatchStrategy; +import io.sermant.router.common.utils.XdsRouterUtils; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * @author daizhenyu + * @since 2024-08-30 + **/ +public class XdsRouterHandlerTest { + private static final String CLUSTER_NAME = "outbound|8080||serviceA.default.svc.cluster.local"; + + private static XdsRouteService routeService; + + private static XdsServiceDiscovery serviceDiscovery; + + private static MockedStatic serviceManager; + + private static MockedStatic xdsRouterUtil; + + private static XdsLocality locality1; + + private static XdsLocality locality3; + + @BeforeClass + public static void setUp() { + routeService = Mockito.mock(XdsRouteService.class); + serviceDiscovery = Mockito.mock(XdsServiceDiscovery.class); + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + Mockito.when(xdsCoreService.getXdsRouteService()).thenReturn(routeService); + Mockito.when(xdsCoreService.getXdsServiceDiscovery()).thenReturn(serviceDiscovery); + Mockito.when(routeService.isLocalityRoute(CLUSTER_NAME)).thenReturn(true); + Mockito.when(routeService.getServiceRoute("serviceA")).thenReturn(createXdsRoute()); + + serviceManager = Mockito.mockStatic(ServiceManager.class); + Mockito.when(ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + + xdsRouterUtil = Mockito.mockStatic(XdsRouterUtils.class); + XdsLocality locality = new XdsLocality(); + locality.setRegion("test-region-1"); + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality)); + + Mockito.when(serviceDiscovery.getServiceInstance("serviceA")).thenReturn(createServiceInstance4Service()); + Mockito.when(serviceDiscovery.getClusterServiceInstance(CLUSTER_NAME)) + .thenReturn(Optional.of(createXdsClusterInstance(CLUSTER_NAME, + Arrays.asList("test-region-1", "test-region-2")))); + + locality1 = new XdsLocality(); + locality1.setRegion("test-region-1"); + + locality3 = new XdsLocality(); + locality3.setRegion("test-region-3"); + } + + @AfterClass + public static void tearDown() { + serviceManager.close(); + xdsRouterUtil.close(); + } + + @Test + public void testGetServiceInstanceByXdsRouteWithPath() { + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality1)); + + // path not match + Set instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test1"); + Assert.assertEquals(4, instances.size()); + + //path match and locality route + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test"); + Assert.assertEquals(1, instances.size()); + + //path match and not locality route + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality3)); + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test"); + Assert.assertEquals(2, instances.size()); + } + + @Test + public void testGetServiceInstanceByXdsRouteWithHeader() { + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality1)); + Map headers = new HashMap<>(); + headers.put("test", "test"); + + // header not match + Set instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", headers); + Assert.assertEquals(4, instances.size()); + + //header match and locality route + headers.put("version", "v1"); + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", headers); + Assert.assertEquals(1, instances.size()); + + //header match and not locality route + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality3)); + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", headers); + Assert.assertEquals(2, instances.size()); + } + + @Test + public void testGetServiceInstanceByXdsRouteWithPathAndHeader() { + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality1)); + + Map headers = new HashMap<>(); + headers.put("test", "test"); + // path and header all not match + Set instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test1", + headers); + Assert.assertEquals(4, instances.size()); + + // path match, header not match + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test", + headers); + Assert.assertEquals(4, instances.size()); + + // path not match, header match + headers.put("version", "v1"); + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test1", + headers); + Assert.assertEquals(4, instances.size()); + + //path and header all match and locality route + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test", headers); + Assert.assertEquals(1, instances.size()); + + //path and header all match and not locality route + Mockito.when(XdsRouterUtils.getLocalityInfoOfSelfService()).thenReturn(Optional.of(locality3)); + instances = XdsRouterHandler.INSTANCE.getServiceInstanceByXdsRoute("serviceA", "/test"); + Assert.assertEquals(2, instances.size()); + } + + private static List createXdsRoute() { + XdsRoute route = new XdsRoute(); + route.setName("test-route"); + + XdsRouteMatch xdsRouteMatch = new XdsRouteMatch(); + XdsPathMatcher pathMatcher = new XdsPathMatcher(new ExactMatchStrategy("/test"), true); + XdsHeaderMatcher headerMatcher = new XdsHeaderMatcher("version", new ExactMatchStrategy("v1")); + xdsRouteMatch.setCaseSensitive(true); + xdsRouteMatch.setPathMatcher(pathMatcher); + xdsRouteMatch.setHeaderMatchers(Arrays.asList(headerMatcher)); + + XdsRouteAction xdsRouteAction = new XdsRouteAction(); + xdsRouteAction.setWeighted(true); + XdsWeightedClusters weightedClusters = new XdsWeightedClusters(); + + XdsClusterWeight clusterWeight = new XdsClusterWeight(); + clusterWeight.setClusterName("outbound|8080||serviceA.default.svc.cluster.local"); + clusterWeight.setWeight(100); + + weightedClusters.setTotalWeight(100); + weightedClusters.setClusters(Arrays.asList(clusterWeight)); + xdsRouteAction.setWeightedClusters(weightedClusters); + + route.setRouteMatch(xdsRouteMatch); + route.setRouteAction(xdsRouteAction); + return Arrays.asList(route); + } + + private static XdsClusterLoadAssigment createXdsClusterInstance(String clusterName, List localityList) { + Map> localityInstances = new HashMap<>(); + for (String region : localityList) { + Set instances = new HashSet<>(); + TestServiceInstance testServiceInstance = new TestServiceInstance(); + Map metaData = new HashMap<>(); + metaData.put("region", region); + testServiceInstance.setMetaData(metaData); + instances.add(testServiceInstance); + XdsLocality locality = new XdsLocality(); + locality.setRegion(region); + localityInstances.put(locality, instances); + } + + XdsClusterLoadAssigment clusterInstance = new XdsClusterLoadAssigment(); + clusterInstance.setClusterName(clusterName); + clusterInstance.setLocalityInstances(localityInstances); + + return clusterInstance; + } + + private static Set createServiceInstance4Service() { + Set serviceInstances = new HashSet<>(); + serviceInstances.add(new TestServiceInstance()); + serviceInstances.add(new TestServiceInstance()); + serviceInstances.add(new TestServiceInstance()); + serviceInstances.add(new TestServiceInstance()); + return serviceInstances; + } +} \ No newline at end of file diff --git a/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsLoadBalancerFactoryTest.java b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsLoadBalancerFactoryTest.java new file mode 100644 index 0000000000..d0ccae388f --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsLoadBalancerFactoryTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import io.sermant.core.service.ServiceManager; +import io.sermant.core.service.xds.XdsCoreService; +import io.sermant.core.service.xds.XdsLoadBalanceService; +import io.sermant.core.service.xds.entity.XdsLbPolicy; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +/** + * XdsLoadBalancerFactoryTest + * + * @author daizhenyu + * @since 2024-09-10 + **/ +public class XdsLoadBalancerFactoryTest { + private static MockedStatic serviceManager; + + private static XdsLoadBalanceService loadBalanceService; + + @BeforeClass + public static void setUp() { + loadBalanceService = Mockito.mock(XdsLoadBalanceService.class); + + XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class); + serviceManager = Mockito.mockStatic(ServiceManager.class); + Mockito.when(ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService); + Mockito.when(xdsCoreService.getLoadBalanceService()).thenReturn(loadBalanceService); + } + + @AfterClass + public static void tearDown() { + serviceManager.close(); + } + + @Test + public void testGetLoadBalancer() { + // random + Mockito.when(loadBalanceService.getLbPolicyOfCluster("outbound|8080||serviceA.default.svc.cluster.local")) + .thenReturn(XdsLbPolicy.RANDOM); + XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory + .getLoadBalancer("outbound|8080||serviceA.default.svc.cluster.local"); + Assert.assertEquals("io.sermant.router.common.xds.lb.XdsRandomLoadBalancer", + loadBalancer.getClass().getCanonicalName()); + + // round robin + Mockito.when(loadBalanceService.getLbPolicyOfCluster("outbound|8080||serviceB.default.svc.cluster.local")) + .thenReturn(XdsLbPolicy.ROUND_ROBIN); + loadBalancer = XdsLoadBalancerFactory + .getLoadBalancer("outbound|8080||serviceB.default.svc.cluster.local"); + Assert.assertEquals("io.sermant.router.common.xds.lb.XdsRoundRobinLoadBalancer", + loadBalancer.getClass().getCanonicalName()); + Assert.assertEquals(loadBalancer, XdsLoadBalancerFactory + .getLoadBalancer("outbound|8080||serviceB.default.svc.cluster.local")); + } +} \ No newline at end of file diff --git a/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsRandomLoadBalancerTest.java b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsRandomLoadBalancerTest.java new file mode 100644 index 0000000000..48f8f3573b --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsRandomLoadBalancerTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.router.common.xds.TestServiceInstance; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * XdsRandomLoadBalancerTest + * + * @author daizhenyu + * @since 2024-09-10 + **/ +public class XdsRandomLoadBalancerTest { + + @Test + public void testSelectInstance() { + XdsLoadBalancer loadBalancer = new XdsRandomLoadBalancer(); + + TestServiceInstance instance1 = new TestServiceInstance(); + instance1.setService("service1"); + TestServiceInstance instance2 = new TestServiceInstance(); + instance2.setService("service2"); + List instances = new ArrayList<>(); + instances.add(instance1); + instances.add(instance2); + + ServiceInstance selectedInstance = loadBalancer.selectInstance(instances); + Assert.assertTrue(instances.contains(selectedInstance)); + } +} \ No newline at end of file diff --git a/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsRoundRobinLoadBalancerTest.java b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsRoundRobinLoadBalancerTest.java new file mode 100644 index 0000000000..48fabf6dd5 --- /dev/null +++ b/sermant-plugins/sermant-router/router-common/src/test/java/io/sermant/router/common/xds/lb/XdsRoundRobinLoadBalancerTest.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.common.xds.lb; + +import static org.junit.Assert.assertEquals; + +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.router.common.xds.TestServiceInstance; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author daizhenyu + * @since 2024-09-10 + **/ +public class XdsRoundRobinLoadBalancerTest { + @Test + public void testSelectInstance() { + XdsLoadBalancer loadBalancer = new XdsRoundRobinLoadBalancer(); + // create TestServiceInstance + TestServiceInstance instance1 = new TestServiceInstance(); + instance1.setService("service1"); + TestServiceInstance instance2 = new TestServiceInstance(); + instance2.setService("service2"); + + List instances = new ArrayList<>(); + instances.add(instance1); + instances.add(instance2); + + // first call + assertEquals(instance1, loadBalancer.selectInstance(instances)); + + // second call + assertEquals(instance2, loadBalancer.selectInstance(instances)); + + // third call + assertEquals(instance1, loadBalancer.selectInstance(instances)); + } +} \ No newline at end of file