diff --git a/changes/en-us/2.0.0.md b/changes/en-us/2.0.0.md
index 6f565947176..6768f6311cb 100644
--- a/changes/en-us/2.0.0.md
+++ b/changes/en-us/2.0.0.md
@@ -88,6 +88,8 @@ The version is updated as follows:
- [[#6018](https://github.com/seata/seata/pull/6018)] fix incorrect metric report
- [[#6024](https://github.com/seata/seata/pull/6024)] fix the white screen after click the "View Global Lock" button on the transaction info page in the console
- [[#6015](https://github.com/seata/seata/pull/6015)] fix can't integrate dubbo with spring
+- [[#6049](https://github.com/seata/seata/pull/6049)] fix registry type for raft under the network interruption did not carry out the sleep 1s
+- [[#6050](https://github.com/seata/seata/pull/6050)] change RaftServer#destroy to wait all shutdown procedures done
### optimize:
- [[#6033](https://github.com/seata/seata/pull/6033)] optimize the isReference judgment logic in HSFRemotingParser, remove unnecessary judgment about FactoryBean
@@ -152,6 +154,7 @@ The version is updated as follows:
- [[#5951](https://github.com/seata/seata/pull/5951)] remove un support config in jdk17
- [[#5959](https://github.com/seata/seata/pull/5959)] modify code style and remove unused import
- [[#6002](https://github.com/seata/seata/pull/6002)] remove fst serialization
+- [[#6045](https://github.com/seata/seata/pull/6045)] optimize derivative product check base on mysql
### security:
diff --git a/changes/zh-cn/2.0.0.md b/changes/zh-cn/2.0.0.md
index ef63a7db76c..08559915448 100644
--- a/changes/zh-cn/2.0.0.md
+++ b/changes/zh-cn/2.0.0.md
@@ -87,6 +87,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#6018](https://github.com/seata/seata/pull/6018)] 修复错误的 metric 上报
- [[#6024](https://github.com/seata/seata/pull/6024)] 修复控制台点击事务信息页面中的"查看全局锁"按钮之后白屏的问题
- [[#6015](https://github.com/seata/seata/pull/6015)] 修复在spring环境下无法集成dubbo
+- [[#6049](https://github.com/seata/seata/pull/6049)] 修复客户端在raft注册中心类型下,网络中断时,watch线程未暂停一秒等待重试的问题
+- [[#6050](https://github.com/seata/seata/pull/6050)] 修改 RaftServer#destroy 为等待所有关闭流程结束
### optimize:
@@ -153,6 +155,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#5951](https://github.com/seata/seata/pull/5951)] 删除在 jdk17 中不支持的配置项
- [[#5959](https://github.com/seata/seata/pull/5959)] 修正代码风格问题及去除无用的类引用
- [[#6002](https://github.com/seata/seata/pull/6002)] 移除fst序列化模块
+- [[#6045](https://github.com/seata/seata/pull/6045)] 优化MySQL衍生数据库判断逻辑
### security:
diff --git a/common/pom.xml b/common/pom.xml
index 85f38fc87bf..0d1d756ebbc 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -40,5 +40,10 @@
commons-lang
commons-lang
+
+ org.apache.httpcomponents
+ httpclient
+ provided
+
diff --git a/common/src/main/java/io/seata/common/util/HttpClientUtil.java b/common/src/main/java/io/seata/common/util/HttpClientUtil.java
new file mode 100644
index 00000000000..d31a4580707
--- /dev/null
+++ b/common/src/main/java/io/seata/common/util/HttpClientUtil.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.common.util;
+
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicNameValuePair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author funkye
+ */
+public class HttpClientUtil {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtil.class);
+
+ private static final Map HTTP_CLIENT_MAP = new ConcurrentHashMap<>();
+
+ private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
+ new PoolingHttpClientConnectionManager();
+
+ static {
+ POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
+ POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ })));
+ }
+
+ // post request
+ public static CloseableHttpResponse doPost(String url, Map params, Map header,
+ int timeout) throws IOException {
+ try {
+ URIBuilder builder = new URIBuilder(url);
+ URI uri = builder.build();
+ HttpPost httpPost = new HttpPost(uri);
+ if (header != null) {
+ header.forEach(httpPost::addHeader);
+ }
+ List nameValuePairs = new ArrayList<>();
+ params.forEach((k, v) -> {
+ nameValuePairs.add(new BasicNameValuePair(k, v));
+ });
+ String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);
+
+ StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
+ httpPost.setEntity(stringEntity);
+ httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
+ CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
+ k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
+ .setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
+ .setSocketTimeout(timeout).setConnectTimeout(timeout).build())
+ .build());
+ return client.execute(httpPost);
+ } catch (URISyntaxException | ClientProtocolException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ return null;
+ }
+
+ // get request
+ public static CloseableHttpResponse doGet(String url, Map param, Map header,
+ int timeout) throws IOException {
+ try {
+ URIBuilder builder = new URIBuilder(url);
+ if (param != null) {
+ for (String key : param.keySet()) {
+ builder.addParameter(key, param.get(key));
+ }
+ }
+ URI uri = builder.build();
+ HttpGet httpGet = new HttpGet(uri);
+ if (header != null) {
+ header.forEach(httpGet::addHeader);
+ }
+ CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
+ k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
+ .setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
+ .setSocketTimeout(timeout).setConnectTimeout(timeout).build())
+ .build());
+ return client.execute(httpGet);
+ } catch (URISyntaxException | ClientProtocolException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
diff --git a/discovery/seata-discovery-raft/src/main/java/io/seata/discovery/registry/raft/RaftRegistryServiceImpl.java b/discovery/seata-discovery-raft/src/main/java/io/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
index feb512bdda8..2a51b1fcedf 100644
--- a/discovery/seata-discovery-raft/src/main/java/io/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
+++ b/discovery/seata-discovery-raft/src/main/java/io/seata/discovery/registry/raft/RaftRegistryServiceImpl.java
@@ -17,7 +17,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,26 +40,15 @@
import io.seata.common.metadata.Node;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
+import io.seata.common.util.HttpClientUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigChangeListener;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.apache.http.HttpStatus;
-import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.client.utils.URLEncodedUtils;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,11 +78,6 @@ public class RaftRegistryServiceImpl implements RegistryService HTTP_CLIENT_MAP = new ConcurrentHashMap<>();
-
private static volatile String CURRENT_TRANSACTION_SERVICE_GROUP;
private static volatile String CURRENT_TRANSACTION_CLUSTER_NAME;
@@ -108,11 +91,6 @@ public class RaftRegistryServiceImpl implements RegistryService> ALIVE_NODES = new ConcurrentHashMap<>();
- static {
- POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
- POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
- }
-
private RaftRegistryServiceImpl() {}
/**
@@ -191,13 +169,6 @@ protected static void startQueryMetadata() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
CLOSED.compareAndSet(false, true);
REFRESH_METADATA_EXECUTOR.shutdown();
- HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
- try {
- client.close();
- } catch (IOException e) {
- LOGGER.error(e.getMessage(), e);
- }
- });
}));
}
}
@@ -274,13 +245,13 @@ private static boolean watch() {
for (String group : groupTerms.keySet()) {
String tcAddress = queryHttpAddress(clusterName, group);
try (CloseableHttpResponse response =
- doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
+ HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
}
- } catch (Exception e) {
- LOGGER.error("watch cluster fail: {}", e.getMessage());
+ } catch (IOException e) {
+ LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
@@ -320,7 +291,7 @@ private static void acquireClusterMetaData(String clusterName, String group) {
param.put("group", group);
String response = null;
try (CloseableHttpResponse httpResponse =
- doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
+ HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
if (httpResponse != null && httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
}
@@ -339,64 +310,6 @@ private static void acquireClusterMetaData(String clusterName, String group) {
}
}
- public static CloseableHttpResponse doGet(String url, Map param, Map header,
- int timeout) {
- CloseableHttpClient client;
- try {
- URIBuilder builder = new URIBuilder(url);
- if (param != null) {
- for (String key : param.keySet()) {
- builder.addParameter(key, param.get(key));
- }
- }
- URI uri = builder.build();
- HttpGet httpGet = new HttpGet(uri);
- if (header != null) {
- header.forEach(httpGet::addHeader);
- }
- client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
- k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
- .setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
- .setSocketTimeout(timeout).setConnectTimeout(timeout).build())
- .build());
- return client.execute(httpGet);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- return null;
- }
-
- public static CloseableHttpResponse doPost(String url, Map params, Map header,
- int timeout) {
- CloseableHttpClient client = null;
- try {
- URIBuilder builder = new URIBuilder(url);
- URI uri = builder.build();
- HttpPost httpPost = new HttpPost(uri);
- if (header != null) {
- header.forEach(httpPost::addHeader);
- }
- List nameValuePairs = new ArrayList<>();
- params.forEach((k, v) -> {
- nameValuePairs.add(new BasicNameValuePair(k, v));
- });
- String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);
-
- StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
- httpPost.setEntity(stringEntity);
- httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
- client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
- k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
- .setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
- .setSocketTimeout(timeout).setConnectTimeout(timeout).build())
- .build());
- return client.execute(httpPost);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- return null;
- }
-
@Override
public List lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java b/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java
index 48317a83516..6d930ea4009 100644
--- a/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java
+++ b/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java
@@ -19,7 +19,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import javax.sql.DataSource;
@@ -31,6 +30,7 @@
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import io.seata.rm.datasource.util.JdbcUtils;
import io.seata.sqlparser.util.JdbcConstants;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +55,16 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource
private String userName;
- private String version;
+ private String kernelVersion;
+
+ private String productVersion;
+
+ /**
+ * POLARDB-X 1.X -> TDDL
+ * POLARDB-X 2.X & MySQL 5.6 -> PXC
+ * POLARDB-X 2.X & MySQL 5.7 -> AliSQL-X
+ */
+ private static final String[] POLARDB_X_PRODUCT_KEYWORD = {"TDDL","AliSQL-X","PXC"};
/**
* Instantiates a new Data source proxy.
@@ -89,9 +98,9 @@ private void init(DataSource dataSource, String resourceGroupId) {
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
} else if (JdbcConstants.MYSQL.equals(dbType)) {
- getMySQLAdaptiveType(connection);
+ validMySQLVersion(connection);
+ checkDerivativeProduct();
}
- version = selectDbVersion(connection);
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
@@ -107,17 +116,31 @@ private void init(DataSource dataSource, String resourceGroupId) {
}
/**
- * get mysql adaptive type for PolarDB-X
+ * Define derivative product version for MySQL Kernel
*
- * @param connection db connection
*/
- private void getMySQLAdaptiveType(Connection connection) {
- try (Statement statement = connection.createStatement()) {
- statement.executeQuery("show rule");
+ private void checkDerivativeProduct() {
+ if (!JdbcConstants.MYSQL.equals(dbType)) {
+ return;
+ }
+ // check for polardb-x
+ if (isPolardbXProduct()) {
dbType = JdbcConstants.POLARDBX;
- } catch (SQLException e) {
- dbType = JdbcConstants.MYSQL;
+ return;
+ }
+ // check for other products base on mysql kernel
+ }
+
+ private boolean isPolardbXProduct() {
+ if (StringUtils.isBlank(productVersion)) {
+ return false;
+ }
+ for (String keyword : POLARDB_X_PRODUCT_KEYWORD) {
+ if (productVersion.contains(keyword)) {
+ return true;
+ }
}
+ return false;
}
/**
@@ -335,27 +358,33 @@ public BranchType getBranchType() {
return BranchType.AT;
}
- public String getVersion() {
- return version;
+ public String getKernelVersion() {
+ return kernelVersion;
}
- private String selectDbVersion(Connection connection) {
- if (JdbcConstants.MYSQL.equals(dbType) || JdbcConstants.POLARDBX.equals(dbType)) {
- try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT VERSION()");
- ResultSet versionResult = preparedStatement.executeQuery()) {
- if (versionResult.next()) {
- String version = versionResult.getString("VERSION()");
- if (version == null) {
- return null;
- }
- int dashIdx = version.indexOf('-');
- // in mysql: 5.6.45, in polardb-x: 5.6.45-TDDL-xxx
- return dashIdx > 0 ? version.substring(0, dashIdx) : version;
+ private void validMySQLVersion(Connection connection) {
+ if (!JdbcConstants.MYSQL.equals(dbType)) {
+ return;
+ }
+ try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT VERSION()");
+ ResultSet versionResult = preparedStatement.executeQuery()) {
+ if (versionResult.next()) {
+ String version = versionResult.getString("VERSION()");
+ if (StringUtils.isBlank(version)) {
+ return;
+ }
+ int dashIdx = version.indexOf('-');
+ // in mysql: 5.6.45, in polardb-x: 5.6.45-TDDL-xxx
+ if (dashIdx > 0) {
+ kernelVersion = version.substring(0, dashIdx);
+ productVersion = version.substring(dashIdx + 1);
+ } else {
+ kernelVersion = version;
+ productVersion = version;
}
- } catch (Exception e) {
- LOGGER.error("get mysql version fail error: {}", e.getMessage());
}
+ } catch (Exception e) {
+ LOGGER.error("check mysql version fail error: {}", e.getMessage());
}
- return "";
}
}
diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java
index 50cac9fa611..e307bc4f371 100644
--- a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java
+++ b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java
@@ -310,6 +310,6 @@ private String buildGroupBy(List pkColumns,List allSelectColumns
}
private String getDbVersion() {
- return statementProxy.getConnectionProxy().getDataSourceProxy().getVersion();
+ return statementProxy.getConnectionProxy().getDataSourceProxy().getKernelVersion();
}
}
diff --git a/server/src/main/java/io/seata/server/cluster/raft/RaftServer.java b/server/src/main/java/io/seata/server/cluster/raft/RaftServer.java
index 32872460bea..8ae7ee10a35 100644
--- a/server/src/main/java/io/seata/server/cluster/raft/RaftServer.java
+++ b/server/src/main/java/io/seata/server/cluster/raft/RaftServer.java
@@ -105,7 +105,14 @@ public void close() {
@Override
public void destroy() {
- Optional.ofNullable(raftGroupService).ifPresent(RaftGroupService::shutdown);
+ Optional.ofNullable(raftGroupService).ifPresent(r -> {
+ r.shutdown();
+ try {
+ r.join();
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted when RaftServer destroying", e);
+ }
+ });
}
}