Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: redis registry push empty protection optimize #6164

Merged
merged 24 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6034](https://github.com/seata/seata/pull/6034)] using namespace from command line when deployment with helm charts
- [[#6116](https://github.com/seata/seata/pull/6034)] remove lgtm.com stuff
- [[#6145](https://github.com/seata/seata/pull/6145)] upgrade jettison to 1.5.4
- [[#6164](https://github.com/seata/seata/pull/6164)] redis registry push empty protection optimize

### security:
- [[#6069](https://github.com/seata/seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [[#6098](https://github.com/seata/seata/pull/6098)] 优化acquireMetadata方法的重试逻辑
- [[#6034](https://github.com/seata/seata/pull/6034)] 使用helm图表进行部署时使用命令行中的命名空间
- [[#6116](https://github.com/seata/seata/pull/6034)] 移除 lgtm.com
- [[#6164](https://github.com/seata/seata/pull/6164)] redis 注册中心推空保护优化

### security:
- [[#6069](https://github.com/seata/seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞
Expand Down
2 changes: 1 addition & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<apache-zookeeper.version>3.5.9</apache-zookeeper.version>
<curator-test.version>5.1.0</curator-test.version>
<spring-context-support.version>1.0.2</spring-context-support.version>
<mock-jedis.version>0.3.0</mock-jedis.version>
<mock-jedis.version>0.3.1</mock-jedis.version>
<apollo-client.version>2.0.1</apollo-client.version>
<eureka-clients.version>1.10.18</eureka-clients.version>
<jettison.version>1.5.4</jettison.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.config.nacos;
package io.seata.discovery.registry.nacos;

import java.lang.reflect.Method;
import java.util.Properties;
Expand Down
5 changes: 5 additions & 0 deletions discovery/seata-discovery-redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>com.github.microwww</groupId>
<artifactId>redis-server</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -29,8 +27,10 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import io.seata.common.ConfigurationKeys;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
Expand Down Expand Up @@ -71,7 +71,7 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener>
private static volatile JedisPool jedisPool;

// redis registry key live 5 seconds, auto refresh key every 2 seconds
private static final int KEY_TTL = 5;
private static final long KEY_TTL = 5L;
private static final long KEY_REFRESH_PERIOD = 2000L;

private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1,
Expand Down Expand Up @@ -187,7 +187,7 @@ public void unregister(InetSocketAddress address) {
@Override
public void subscribe(String cluster, RedisListener listener) {
String redisRegistryKey = REDIS_FILEKEY_PREFIX + cluster;
LISTENER_SERVICE_MAP.computeIfAbsent(cluster, key -> new ArrayList<>())
CollectionUtils.computeIfAbsent(LISTENER_SERVICE_MAP, cluster, key -> new ArrayList<>())
.add(listener);


Expand Down Expand Up @@ -240,17 +240,45 @@ List<InetSocketAddress> lookupByCluster(String clusterName) {
String eventType = msgr[1];
switch (eventType) {
case RedisListener.REGISTER:
CLUSTER_ADDRESS_MAP.get(clusterName).add(NetUtil.toInetSocketAddress(serverAddr));
CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2))
.add(NetUtil.toInetSocketAddress(serverAddr));
break;
case RedisListener.UN_REGISTER:
CLUSTER_ADDRESS_MAP.get(clusterName).remove(NetUtil.toInetSocketAddress(serverAddr));
removeServerAddressByPushEmptyProtection(clusterName, serverAddr);
break;
default:
throw new ShouldNeverHappenException("unknown redis msg:" + msg);
}
});
}
return new ArrayList<>(CLUSTER_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptySet()));
return new ArrayList<>(CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2)));
}

/**
*
* if the serverAddr is unique in the address list and
* the callback cluster is current cluster, then enable push-empty protection
* Otherwise, remove it.
*
* @param notifyCluserName notifyCluserName
* @param serverAddr serverAddr
*/
private void removeServerAddressByPushEmptyProtection(String notifyCluserName, String serverAddr) {

Set<InetSocketAddress> socketAddresses = CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, notifyCluserName, value -> ConcurrentHashMap.newKeySet(2));
InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(serverAddr);
if (socketAddresses.size() == 1 && socketAddresses.contains(inetSocketAddress)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请考虑并发性问题
Please consider concurrency issues

String txServiceGroupName = ConfigurationFactory.getInstance()
.getConfig(ConfigurationKeys.TX_SERVICE_GROUP);

if (StringUtils.isNotEmpty(txServiceGroupName)) {
String clusterName = getServiceGroup(txServiceGroupName);
if (notifyCluserName.equals(clusterName)) {
return;
}
}
}
CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress);
}

@Override
Expand Down Expand Up @@ -291,7 +319,7 @@ private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, Strin
scanParams.count(10);
scanParams.match(redisRegistryKey + "_*");
String cursor = ScanParams.SCAN_POINTER_START;
Set<InetSocketAddress> newAddressSet = new HashSet<>();
Set<InetSocketAddress> newAddressSet = ConcurrentHashMap.newKeySet(2);
do {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
Expand All @@ -307,7 +335,7 @@ private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, Strin
}
} while (!cursor.equals(ScanParams.SCAN_POINTER_START));

if (!newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
if (CollectionUtils.isNotEmpty(newAddressSet) && !newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
package io.seata.discovery.registry.redis;

import com.github.microwww.redis.RedisServer;
import io.seata.common.util.NetUtil;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import org.junit.jupiter.api.*;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved

/**
* @author laywin
*/
public class RedisRegisterServiceImplTest {

Logger logger = LoggerFactory.getLogger(getClass());

private static RedisRegistryServiceImpl redisRegistryService;

private static RedisServer server;


@BeforeAll
public static void init() throws IOException {
System.setProperty("config.type", "file");
System.setProperty("config.file.name", "file.conf");
System.setProperty("txServiceGroup", "default_tx_group");
System.setProperty("service.vgroupMapping.default_tx_group", "default");
System.setProperty("registry.redis.serverAddr", "127.0.0.1:6789");
System.setProperty("registry.redis.cluster", "default");
RedisServer server = new RedisServer();
server.listener("127.0.0.1", 6789);
redisRegistryService = RedisRegistryServiceImpl.getInstance();
}

@Test
public void testFlow() {

redisRegistryService.lookup("default_tx_group");

redisRegistryService.register(new InetSocketAddress(NetUtil.getLocalIp(), 8091));

redisRegistryService.unregister(new InetSocketAddress(NetUtil.getLocalIp(), 8091));

Assertions.assertTrue(true);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testRemoveServerAddressByPushEmptyProtection()
throws NoSuchFieldException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {

MockedStatic<ConfigurationFactory> configurationFactoryMockedStatic = mockStatic(ConfigurationFactory.class);
Configuration configuration = mock(Configuration.class);
when(configuration.getConfig(anyString())).thenReturn("cluster");

configurationFactoryMockedStatic.when(ConfigurationFactory::getInstance).thenReturn(configuration);

Field field = RedisRegistryServiceImpl.class.getDeclaredField("CLUSTER_ADDRESS_MAP");
field.setAccessible(true);

ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP = (ConcurrentMap<String, Set<InetSocketAddress>>)field.get(null);
CLUSTER_ADDRESS_MAP.put("cluster", Sets.newSet(NetUtil.toInetSocketAddress("127.0.0.1:8091")));

Method method = RedisRegistryServiceImpl.class.getDeclaredMethod("removeServerAddressByPushEmptyProtection", String.class, String.class);
method.setAccessible(true);
method.invoke(redisRegistryService, "cluster", "127.0.0.1:8091");

// test the push empty protection situation
Assertions.assertEquals(1, CLUSTER_ADDRESS_MAP.get("cluster").size());


when(configuration.getConfig(anyString())).thenReturn("mycluster");

method.invoke(redisRegistryService, "cluster", "127.0.0.1:8091");
configurationFactoryMockedStatic.close();

// test the normal remove situation
Assertions.assertEquals(0, CLUSTER_ADDRESS_MAP.get("cluster").size());
}

@AfterAll
public static void afterAll() {
if (server != null) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Loading