diff --git a/build/pom.xml b/build/pom.xml
index 3d8709ee683..a04576aa48b 100644
--- a/build/pom.xml
+++ b/build/pom.xml
@@ -89,6 +89,7 @@
3.6.1
2.0
1.9.13
+ 8.1.0
5.8.2
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index b1afbb58d16..f733f2988f8 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -4,6 +4,8 @@ Add changes here for all PR submitted to the 2.x branch.
### feature:
+
+- [[#6756](https://github.com/apache/incubator-seata/pull/6756)] feature: add single server rate limit
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation
@@ -40,7 +42,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
- [PeppaO](https://github.com/PeppaO)
+- [xjlgod](https://github.com/xjlgod)
- [funky-eyes](https://github.com/funky-eyes)
-
Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
\ No newline at end of file
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 0cf918bf87a..48091415f1e 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -4,6 +4,7 @@
### feature:
+- [[#6756](https://github.com/apache/incubator-seata/pull/6756)] seata服务单点限流支持
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
@@ -39,6 +40,7 @@
- [GoodBoyCoder](https://github.com/GoodBoyCoder)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
+- [xjlgod](https://github.com/xjlgod)
- [PeppaO](https://github.com/PeppaO)
- [funky-eyes](https://github.com/funky-eyes)
diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
index 3bb4c9873ff..414ef0c19b2 100644
--- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
+++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
@@ -1111,6 +1111,31 @@ public interface ConfigurationKeys {
*/
String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata.";
+ /**
+ * The constant RATE_LIMIT_PREFIX.
+ */
+ String RATE_LIMIT_PREFIX = SERVER_PREFIX + "ratelimit";
+
+ /**
+ * The constant RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND.
+ */
+ String RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND = RATE_LIMIT_PREFIX + ".bucketTokenNumPerSecond";
+
+ /**
+ * The constant RATE_LIMIT_ENABLE.
+ */
+ String RATE_LIMIT_ENABLE = RATE_LIMIT_PREFIX + ".enable";
+
+ /**
+ * The constant RATE_LIMIT_BUCKET_TOKEN_MAX_NUM.
+ */
+ String RATE_LIMIT_BUCKET_TOKEN_MAX_NUM = RATE_LIMIT_PREFIX + ".bucketTokenMaxNum";
+
+ /**
+ * The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.
+ */
+ String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX + ".bucketTokenInitialNum";
+
/**
* The constant SERVER_REGISTRY_METADATA_PREFIX
*/
diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java
index 85ed496430d..228223a7ec8 100644
--- a/common/src/main/java/org/apache/seata/common/DefaultValues.java
+++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java
@@ -497,6 +497,12 @@ public interface DefaultValues {
*/
int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000;
+ /**
+ * The constant DEFAULT_RATE_LIMIT_ENABLE.
+ */
+ boolean DEFAULT_RATE_LIMIT_ENABLE = false;
+
+
/**
* The constant DEFAULT_RAFT_SSL_ENABLED.
*/
diff --git a/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java b/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java
index f56ad5cad43..fc3998d9868 100644
--- a/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java
+++ b/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java
@@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* The type Default failure handler.
*/
diff --git a/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java b/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java
new file mode 100644
index 00000000000..f11a7158506
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.event;
+
+public class RateLimitEvent implements Event {
+
+ /**
+ * The Trace id.
+ */
+ private String traceId;
+
+ /**
+ * The Limit type (like GlobalBeginFailed).
+ */
+ private String limitType;
+
+ /**
+ * The Application id.
+ */
+ private String applicationId;
+
+ /**
+ * The Client id.
+ */
+ private String clientId;
+
+ /**
+ * The Server ip address and port.
+ */
+ private String serverIpAddressAndPort;
+
+ public String getTraceId() {
+ return traceId;
+ }
+
+ public void setTraceId(String traceId) {
+ this.traceId = traceId;
+ }
+
+ public String getLimitType() {
+ return limitType;
+ }
+
+ public void setLimitType(String limitType) {
+ this.limitType = limitType;
+ }
+
+ public String getApplicationId() {
+ return applicationId;
+ }
+
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getServerIpAddressAndPort() {
+ return serverIpAddressAndPort;
+ }
+
+ public void setServerIpAddressAndPort(String serverIpAddressAndPort) {
+ this.serverIpAddressAndPort = serverIpAddressAndPort;
+ }
+
+ public RateLimitEvent(String traceId, String limitType, String applicationId, String clientId, String serverIpAddressAndPort) {
+ this.traceId = traceId;
+ this.limitType = limitType;
+ this.applicationId = applicationId;
+ this.clientId = clientId;
+ this.serverIpAddressAndPort = serverIpAddressAndPort;
+ }
+
+ @Override
+ public String toString() {
+ return "RateLimitEvent{" +
+ "traceId='" + traceId + '\'' +
+ ", limitType='" + limitType + '\'' +
+ ", applicationId='" + applicationId + '\'' +
+ ", clientId='" + clientId + '\'' +
+ ", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java b/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java
index 16a2e899dc6..962a9486ec4 100644
--- a/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java
+++ b/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java
@@ -31,7 +31,6 @@ public enum TransactionExceptionCode {
* BeginFailed
*/
BeginFailed,
-
/**
* Lock key conflict transaction exception code.
*/
diff --git a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java
index 58adbfbce6f..43a2108f39d 100644
--- a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java
+++ b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java
@@ -22,6 +22,11 @@
*/
public interface MessageType {
+ /**
+ * The constant TYPE_NOT_EXIST.
+ */
+ short TYPE_NOT_EXIST = 0;
+
/**
* The constant TYPE_GLOBAL_BEGIN.
*/
diff --git a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java
index b1c7ef3e741..6039fbc6351 100644
--- a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java
+++ b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java
@@ -45,7 +45,8 @@ void getInt() {
@Test
void values() {
- Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, ResultCode.values());
+ Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success},
+ ResultCode.values());
}
@Test
diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index 2b567d6927f..a68d94704aa 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -68,6 +68,7 @@
6.3.0
1.0.0
1.82
+ 8.1.0
1.21
1.10.12
1.7.1
@@ -618,6 +619,11 @@
${jcommander.version}
+
+ com.bucket4j
+ bucket4j_jdk8-core
+ ${bucket4j.version}
+
io.grpc
grpc-testing
diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
index 288bd5dd782..e2b77967deb 100644
--- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
+++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
@@ -56,6 +56,7 @@
import org.apache.seata.tm.api.transaction.NoRollbackRule;
import org.apache.seata.tm.api.transaction.RollbackRule;
import org.apache.seata.tm.api.transaction.TransactionInfo;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java b/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java
index 507bafafee6..f6ee3e60280 100644
--- a/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java
+++ b/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java
@@ -25,6 +25,8 @@ public interface IdConstants {
String SEATA_EXCEPTION = "seata.exception";
+ String SEATA_RATE_LIMIT = "seata.rate.limit";
+
String APP_ID_KEY = "applicationId";
String GROUP_KEY = "group";
@@ -79,4 +81,9 @@ public interface IdConstants {
String STATUS_VALUE_AFTER_ROLLBACKED_KEY = "AfterRollbacked";
+ String LIMIT_TYPE_KEY = "limitType";
+
+ String CLIENT_ID_KEY = "clientId";
+
+ String HOST_AND_PORT = "hostAndPort";
}
diff --git a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java
index 869f38bf1a6..3f0234c060f 100644
--- a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java
+++ b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java
@@ -16,15 +16,14 @@
*/
package org.apache.seata.saga.engine.tm;
-import java.util.List;
-
import org.apache.seata.common.exception.FrameworkErrorCode;
+import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
-import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.ShutdownHook;
+import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.RMClient;
@@ -39,7 +38,6 @@
import org.apache.seata.tm.api.transaction.TransactionHook;
import org.apache.seata.tm.api.transaction.TransactionHookManager;
import org.apache.seata.tm.api.transaction.TransactionInfo;
-import org.apache.seata.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -49,6 +47,8 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
+import java.util.List;
+
/**
* Template of executing business logic with a global transaction for SAGA mode
*/
@@ -93,7 +93,6 @@ public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws Transac
triggerAfterBegin(tx);
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
-
}
return tx;
}
diff --git a/script/config-center/config.txt b/script/config-center/config.txt
index af831bc5d23..52b62cc3c13 100644
--- a/script/config-center/config.txt
+++ b/script/config-center/config.txt
@@ -162,7 +162,10 @@ server.raft.serialization=jackson
server.raft.compressor=none
server.raft.sync=true
-
+server.ratelimit.enable=false
+server.ratelimit.bucketTokenNumPerSecond = 999999
+server.ratelimit.bucketTokenMaxNum = 999999
+server.ratelimit.bucketTokenInitialNum = 999999
#Metrics configuration, only for the server
metrics.enabled=true
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
index db09911af18..cf356f8ee1c 100644
--- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java
@@ -71,6 +71,7 @@ public interface StarterConstants {
String SERVER_PREFIX = SEATA_PREFIX + ".server";
+ String SERVER_RATELIMIT_PREFIX = SERVER_PREFIX + ".ratelimit";
String SERVER_UNDO_PREFIX = SERVER_PREFIX + ".undo";
String SERVER_RAFT_PREFIX = SERVER_PREFIX + ".raft";
String SERVER_RECOVERY_PREFIX = SERVER_PREFIX + ".recovery";
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java
index 9749a003d81..ff459bf9190 100644
--- a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java
@@ -17,6 +17,8 @@
package org.apache.seata.spring.boot.autoconfigure;
import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerRateLimitProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.store.StoreProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.MetricsProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerProperties;
@@ -38,6 +40,7 @@
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.PROPERTY_BEAN_MAP;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_PREFIX;
+import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RECOVERY_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_UNDO_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX;
@@ -81,6 +84,7 @@ public static void init() {
PROPERTY_BEAN_MAP.put(SERVER_RAFT_PREFIX, ServerRaftProperties.class);
PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class);
PROPERTY_BEAN_MAP.put(STORE_PREFIX, StoreProperties.class);
+ PROPERTY_BEAN_MAP.put(SERVER_RATELIMIT_PREFIX, ServerRateLimitProperties.class);
}
}
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java
new file mode 100644
index 00000000000..d9249eede82
--- /dev/null
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.spring.boot.autoconfigure.properties.server;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX;
+
+@Component
+@ConfigurationProperties(prefix = SERVER_RATELIMIT_PREFIX)
+public class ServerRateLimitProperties {
+ /**
+ * whether enable server rate limit
+ */
+ private boolean enable;
+
+ /**
+ * limit token number of bucket per second
+ */
+ private Integer bucketTokenNumPerSecond;
+
+ /**
+ * limit token max number of bucket
+ */
+ private Integer bucketTokenMaxNum;
+
+ /**
+ * limit token initial number of bucket
+ */
+ private Integer bucketTokenInitialTime;
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+
+ public Integer getBucketTokenNumPerSecond() {
+ return bucketTokenNumPerSecond;
+ }
+
+ public void setBucketTokenNumPerSecond(Integer bucketTokenNumPerSecond) {
+ this.bucketTokenNumPerSecond = bucketTokenNumPerSecond;
+ }
+
+ public Integer getBucketTokenMaxNum() {
+ return bucketTokenMaxNum;
+ }
+
+ public void setBucketTokenMaxNum(Integer bucketTokenMaxNum) {
+ this.bucketTokenMaxNum = bucketTokenMaxNum;
+ }
+
+ public Integer getBucketTokenInitialTime() {
+ return bucketTokenInitialTime;
+ }
+
+ public void setBucketTokenInitialTime(Integer bucketTokenInitialTime) {
+ this.bucketTokenInitialTime = bucketTokenInitialTime;
+ }
+}
diff --git a/server/pom.xml b/server/pom.xml
index 5ba3e8dd9e9..7b8a4b1b94d 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -329,6 +329,13 @@
jackson-mapper-asl
${jackson-mapper.version}
+
+
+
+ com.bucket4j
+ bucket4j_jdk8-core
+ ${bucket4j.version}
+
diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
index 468e9ccc5ff..f6a5e6cb8b5 100644
--- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
+++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
@@ -66,6 +66,7 @@
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
import org.apache.seata.server.AbstractTCInboundHandler;
+import org.apache.seata.server.limit.LimitRequestDecorator;
import org.apache.seata.server.metrics.MetricsPublisher;
import org.apache.seata.server.session.BranchSession;
import org.apache.seata.server.session.GlobalSession;
@@ -643,7 +644,8 @@ public AbstractResultMessage onRequest(AbstractMessage request, RpcContext conte
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
- return transactionRequest.handle(context);
+ LimitRequestDecorator limitRequestDecorator = new LimitRequestDecorator(transactionRequest);
+ return limitRequestDecorator.handle(context);
}
@Override
diff --git a/server/src/main/java/org/apache/seata/server/limit/AbstractTransactionRequestHandler.java b/server/src/main/java/org/apache/seata/server/limit/AbstractTransactionRequestHandler.java
new file mode 100644
index 00000000000..55fff518d59
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/AbstractTransactionRequestHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit;
+
+import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC;
+import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse;
+import org.apache.seata.core.rpc.RpcContext;
+
+/**
+ * TransactionRequestLimitHandler
+ */
+public abstract class AbstractTransactionRequestHandler {
+
+ /**
+ * limit handler
+ */
+ protected AbstractTransactionRequestHandler abstractTransactionRequestHandler;
+
+ public AbstractTransactionRequestHandler() {
+ }
+
+ /**
+ * next handler handle
+ * @param context
+ * @return
+ */
+ protected AbstractTransactionResponse next(AbstractTransactionRequestToTC originRequest, RpcContext context) {
+ if (abstractTransactionRequestHandler != null) {
+ return abstractTransactionRequestHandler.next(originRequest, context);
+ }
+ return originRequest.handle(context);
+ }
+
+ public abstract AbstractTransactionResponse handle(AbstractTransactionRequestToTC originRequest, RpcContext context);
+
+ public void setTransactionRequestLimitHandler(AbstractTransactionRequestHandler abstractTransactionRequestHandler) {
+ this.abstractTransactionRequestHandler = abstractTransactionRequestHandler;
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java b/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java
new file mode 100644
index 00000000000..edfde9e2013
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit;
+
+import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC;
+import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse;
+import org.apache.seata.core.rpc.RpcContext;
+import org.apache.seata.server.limit.ratelimit.RateLimiterHandler;
+
+/**
+ * LimitRequestDecorator decorate AbstractTransactionRequestToTC to use limiter
+ */
+public class LimitRequestDecorator extends AbstractTransactionRequestToTC {
+
+ private AbstractTransactionRequestToTC originalRequest;
+
+ private AbstractTransactionRequestHandler requestLimitHandler;
+
+ public LimitRequestDecorator(AbstractTransactionRequestToTC originalRequest) {
+ this.originalRequest = originalRequest;
+
+ // create server rate limter
+ RateLimiterHandler rateLimiterHandler = RateLimiterHandler.getInstance();
+ rateLimiterHandler.setTransactionRequestLimitHandler(null);
+ requestLimitHandler = rateLimiterHandler;
+ }
+
+
+ @Override
+ public AbstractTransactionResponse handle(RpcContext rpcContext) {
+ return requestLimitHandler.handle(originalRequest, rpcContext);
+ }
+
+ @Override
+ public short getTypeCode() {
+ return originalRequest.getTypeCode();
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java
new file mode 100644
index 00000000000..1d1fd570fd8
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit.ratelimit;
+
+import org.apache.seata.common.util.UUIDGenerator;
+
+/**
+ * The type Rate limit info.
+ */
+public class RateLimitInfo {
+
+ /**
+ * The constant ROLE_TC.
+ */
+ public static final String GLOBAL_BEGIN_FAILED = "globalBeginFailed";
+
+ /**
+ * The Trace id.
+ */
+ private String traceId;
+
+ /**
+ * The Limit type (like GlobalBeginFailed).
+ */
+ private String limitType;
+
+ /**
+ * The Application id.
+ */
+ private String applicationId;
+
+ /**
+ * The Client id.
+ */
+ private String clientId;
+
+ /**
+ * The Server ip address and port.
+ */
+ private String serverIpAddressAndPort;
+
+ private RateLimitInfo() {
+ }
+
+ public static RateLimitInfo generateRateLimitInfo(String applicationId, String type,
+ String clientId, String serverIpAddressAndPort) {
+ RateLimitInfo rateLimitInfo = new RateLimitInfo();
+ rateLimitInfo.setTraceId(String.valueOf(UUIDGenerator.generateUUID()));
+ rateLimitInfo.setLimitType(type);
+ rateLimitInfo.setApplicationId(applicationId);
+ rateLimitInfo.setClientId(clientId);
+ rateLimitInfo.setServerIpAddressAndPort(serverIpAddressAndPort);
+ return rateLimitInfo;
+ }
+
+ public String getTraceId() {
+ return traceId;
+ }
+
+ public void setTraceId(String traceId) {
+ this.traceId = traceId;
+ }
+
+ public String getLimitType() {
+ return limitType;
+ }
+
+ public void setLimitType(String limitType) {
+ this.limitType = limitType;
+ }
+
+ public String getApplicationId() {
+ return applicationId;
+ }
+
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getServerIpAddressAndPort() {
+ return serverIpAddressAndPort;
+ }
+
+ public void setServerIpAddressAndPort(String serverIpAddressAndPort) {
+ this.serverIpAddressAndPort = serverIpAddressAndPort;
+ }
+
+ @Override
+ public String toString() {
+ return "RateLimitInfo{" +
+ "traceId='" + traceId + '\'' +
+ ", limitType='" + limitType + '\'' +
+ ", applicationId='" + applicationId + '\'' +
+ ", clientId='" + clientId + '\'' +
+ ", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java
new file mode 100644
index 00000000000..ffddedd58e0
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit.ratelimit;
+
+/**
+ * RateLimiter
+ */
+public interface RateLimiter {
+ /**
+ * check whether the request can pass
+ *
+ * @return the boolean
+ */
+ boolean canPass();
+
+ /**
+ * reInit reinitialize the rate limiter
+ */
+ void reInit(RateLimiterHandlerConfig config);
+
+ /**
+ * obtainConfig obtain the config of rate limiter
+ *
+ * @return
+ */
+ RateLimiterHandlerConfig obtainConfig();
+
+ /**
+ * whether the rate limiter is enabled
+ *
+ * @return the boolean
+ */
+ boolean isEnable();
+}
diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java
new file mode 100644
index 00000000000..a52dd74dd8d
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit.ratelimit;
+
+import org.apache.seata.common.XID;
+import org.apache.seata.common.loader.EnhancedServiceLoader;
+import org.apache.seata.common.util.NumberUtils;
+import org.apache.seata.config.CachedConfigurationChangeListener;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationChangeEvent;
+import org.apache.seata.common.ConfigurationKeys;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.exception.TransactionExceptionCode;
+import org.apache.seata.core.protocol.MessageType;
+import org.apache.seata.core.protocol.ResultCode;
+import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC;
+import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse;
+import org.apache.seata.core.protocol.transaction.GlobalBeginResponse;
+import org.apache.seata.core.rpc.RpcContext;
+import org.apache.seata.server.limit.AbstractTransactionRequestHandler;
+import org.apache.seata.server.metrics.MetricsPublisher;
+
+/**
+ * RateLimiterHandler
+ */
+public class RateLimiterHandler extends AbstractTransactionRequestHandler implements CachedConfigurationChangeListener {
+ /**
+ * The instance of RateLimiterHandler
+ */
+ private static volatile RateLimiterHandler instance;
+
+ /**
+ * The instance of RateLimiter
+ */
+ private final RateLimiter rateLimiter;
+
+ /**
+ * The config of RateLimiterHandler
+ */
+ private final RateLimiterHandlerConfig config;
+
+ public RateLimiterHandler(RateLimiter rateLimiter) {
+ this.rateLimiter = rateLimiter;
+ this.config = new RateLimiterHandlerConfig();
+ }
+
+ private RateLimiterHandler() {
+ rateLimiter = EnhancedServiceLoader.load(RateLimiter.class);
+ config = rateLimiter.obtainConfig();
+
+ Configuration config = ConfigurationFactory.getInstance();
+ config.addConfigListener(ConfigurationKeys.RATE_LIMIT_ENABLE, this);
+ config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND, this);
+ config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM, this);
+ config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM, this);
+ }
+
+ @Override
+ public AbstractTransactionResponse handle(AbstractTransactionRequestToTC originRequest, RpcContext context) {
+ if (!rateLimiter.isEnable()) {
+ return next(originRequest, context);
+ }
+
+ if (MessageType.TYPE_GLOBAL_BEGIN == originRequest.getTypeCode()) {
+ if (!rateLimiter.canPass()) {
+ GlobalBeginResponse response = new GlobalBeginResponse();
+ response.setTransactionExceptionCode(TransactionExceptionCode.BeginFailed);
+ response.setResultCode(ResultCode.Failed);
+ RateLimitInfo rateLimitInfo = RateLimitInfo.generateRateLimitInfo(context.getApplicationId(),
+ RateLimitInfo.GLOBAL_BEGIN_FAILED, context.getClientId(), XID.getIpAddressAndPort());
+ MetricsPublisher.postRateLimitEvent(rateLimitInfo);
+ response.setMsg(String.format("TransactionException[rate limit exception, rate limit info: %s]", rateLimitInfo));
+ return response;
+ }
+ }
+ return next(originRequest, context);
+ }
+
+ public static RateLimiterHandler getInstance() {
+ if (instance == null) {
+ synchronized (RateLimiterHandler.class) {
+ if (instance == null) {
+ instance = new RateLimiterHandler();
+ }
+ }
+ }
+ return instance;
+ }
+
+ @Override
+ public void onChangeEvent(ConfigurationChangeEvent event) {
+ String dataId = event.getDataId();
+ String newValue = event.getNewValue();
+ if (ConfigurationKeys.RATE_LIMIT_ENABLE.equals(dataId)) {
+ config.setEnable(Boolean.parseBoolean(newValue));
+ }
+ if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND.equals(dataId)) {
+ config.setBucketTokenNumPerSecond(NumberUtils.toInt(newValue, config.getBucketTokenNumPerSecond()));
+ }
+ if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM.equals(dataId)) {
+ config.setBucketTokenMaxNum(NumberUtils.toInt(newValue, config.getBucketTokenMaxNum()));
+ }
+ if (ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.equals(dataId)) {
+ config.setBucketTokenInitialNum(NumberUtils.toInt(newValue, config.getBucketTokenInitialNum()));
+ }
+ rateLimiter.reInit(config);
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java
new file mode 100644
index 00000000000..89db918e2f1
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit.ratelimit;
+
+/**
+ * RateLimiterHandlerConfig
+ */
+public class RateLimiterHandlerConfig {
+ /**
+ * whether enable server rate limit
+ */
+ private boolean enable;
+
+ /**
+ * limit token number of bucket per second
+ */
+ private int bucketTokenNumPerSecond;
+
+ /**
+ * limit token max number of bucket
+ */
+ private int bucketTokenMaxNum;
+
+ /**
+ * limit token initial number of bucket
+ */
+ private int bucketTokenInitialNum;
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+
+ public int getBucketTokenNumPerSecond() {
+ return bucketTokenNumPerSecond;
+ }
+
+ public void setBucketTokenNumPerSecond(int bucketTokenNumPerSecond) {
+ this.bucketTokenNumPerSecond = bucketTokenNumPerSecond;
+ }
+
+ public int getBucketTokenMaxNum() {
+ return bucketTokenMaxNum;
+ }
+
+ public void setBucketTokenMaxNum(int bucketTokenMaxNum) {
+ this.bucketTokenMaxNum = bucketTokenMaxNum;
+ }
+
+ public int getBucketTokenInitialNum() {
+ return bucketTokenInitialNum;
+ }
+
+ public void setBucketTokenInitialNum(int bucketTokenInitialNum) {
+ this.bucketTokenInitialNum = bucketTokenInitialNum;
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java
new file mode 100644
index 00000000000..419b4fe7050
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.limit.ratelimit;
+
+import org.apache.seata.common.ConfigurationKeys;
+import org.apache.seata.common.executor.Initialize;
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.common.loader.Scope;
+import org.apache.seata.common.util.NumberUtils;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.Bucket;
+import io.github.bucket4j.Refill;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * TokenBucketLimiter based on Bucket4j
+ */
+@LoadLevel(name = "token-bucket-limiter", scope = Scope.SINGLETON)
+public class TokenBucketLimiter implements RateLimiter, Initialize {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketLimiter.class);
+
+ /**
+ * whether enable server rate limit
+ */
+ private boolean enable;
+
+ /**
+ * limit token number of bucket per second
+ */
+ private Integer bucketTokenNumPerSecond;
+
+ /**
+ * limit token max number of bucket
+ */
+ private Integer bucketTokenMaxNum;
+
+ /**
+ * limit token initial number of bucket
+ */
+ private Integer bucketTokenInitialNum;
+
+ /**
+ * the Bucket
+ */
+ private Bucket bucket;
+
+ private static final int DEFAULT_BUCKET_TOKEN_NUM_PER_SECOND = Integer.MAX_VALUE;
+ private static final int DEFAULT_BUCKET_TOKEN_MAX_NUM = Integer.MAX_VALUE;
+ private static final int DEFAULT_BUCKET_TOKEN_INITIAL_NUM = Integer.MAX_VALUE;
+
+ public TokenBucketLimiter() {}
+
+ public TokenBucketLimiter(boolean enable, Integer bucketTokenNumPerSecond,
+ Integer bucketTokenMaxNum, Integer bucketTokenInitialNum) {
+ this.enable = enable;
+ this.bucketTokenNumPerSecond = bucketTokenNumPerSecond;
+ this.bucketTokenMaxNum = bucketTokenMaxNum;
+ this.bucketTokenInitialNum = bucketTokenInitialNum;
+ initBucket();
+ }
+
+ @Override
+ public void init() {
+ final Configuration config = ConfigurationFactory.getInstance();
+ this.enable = config.getBoolean(ConfigurationKeys.RATE_LIMIT_ENABLE);
+ this.bucketTokenNumPerSecond = NumberUtils.toInt(
+ config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_NUM_PER_SECOND),
+ DEFAULT_BUCKET_TOKEN_NUM_PER_SECOND
+ );
+ this.bucketTokenMaxNum = NumberUtils.toInt(
+ config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM),
+ DEFAULT_BUCKET_TOKEN_MAX_NUM
+ );
+ this.bucketTokenInitialNum = NumberUtils.toInt(
+ config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM),
+ DEFAULT_BUCKET_TOKEN_INITIAL_NUM
+ );
+
+ if (this.enable) {
+ initBucket();
+ LOGGER.info("TokenBucketLimiter init success, bucketTokenNumPerSecond: {}, tokenMaxNum: {}, tokenInitialNum: {}",
+ this.bucketTokenNumPerSecond, this.bucketTokenMaxNum, this.bucketTokenInitialNum);
+ }
+ }
+
+ @Override
+ public boolean canPass() {
+ return bucket.tryConsume(1);
+ }
+
+ @Override
+ public void reInit(RateLimiterHandlerConfig config) {
+ this.enable = config.isEnable();
+ this.bucketTokenNumPerSecond = config.getBucketTokenNumPerSecond();
+ this.bucketTokenMaxNum = config.getBucketTokenMaxNum();
+ this.bucketTokenInitialNum = config.getBucketTokenInitialNum();
+
+ if (this.enable) {
+ initBucket();
+ LOGGER.info("TokenBucketLimiter reInit success, bucketTokenNumPerSecond: {}, tokenMaxNum: {}, tokenInitialNum: {}",
+ this.bucketTokenNumPerSecond, this.bucketTokenMaxNum, this.bucketTokenInitialNum);
+ return;
+ }
+ LOGGER.info("TokenBucketLimiter reInit success, The limiter is disabled");
+ }
+
+ @Override
+ public RateLimiterHandlerConfig obtainConfig() {
+ RateLimiterHandlerConfig config = new RateLimiterHandlerConfig();
+ config.setEnable(this.enable);
+ config.setBucketTokenNumPerSecond(this.bucketTokenNumPerSecond);
+ config.setBucketTokenMaxNum(this.bucketTokenMaxNum);
+ config.setBucketTokenInitialNum(this.bucketTokenInitialNum);
+ return config;
+ }
+
+ @Override
+ public boolean isEnable() {
+ return this.enable;
+ }
+
+ private void initBucket() {
+ Bandwidth limit = Bandwidth.classic(this.bucketTokenMaxNum, Refill.greedy(this.bucketTokenNumPerSecond,
+ Duration.ofSeconds(1)));
+ Bucket bucket = Bucket.builder().addLimit(limit).build();
+ if (this.bucketTokenInitialNum > 0) {
+ bucket.addTokens(this.bucketTokenInitialNum);
+ }
+ this.bucket = bucket;
+ }
+
+}
diff --git a/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java b/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java
index 18787594f8f..78ea4fca537 100644
--- a/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java
+++ b/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java
@@ -109,4 +109,8 @@ public interface MeterIdConstants {
Id SUMMARY_EXP = new Id(IdConstants.SEATA_EXCEPTION)
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY);
+
+ Id SUMMARY_RATE_LIMIT = new Id(IdConstants.SEATA_RATE_LIMIT)
+ .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC)
+ .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY);
}
diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java
index d7dc1beef13..0f51809986a 100644
--- a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java
+++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java
@@ -18,8 +18,10 @@
import org.apache.seata.core.event.EventBus;
import org.apache.seata.core.event.GlobalTransactionEvent;
+import org.apache.seata.core.event.RateLimitEvent;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.server.event.EventBusManager;
+import org.apache.seata.server.limit.ratelimit.RateLimitInfo;
import org.apache.seata.server.session.GlobalSession;
/**
@@ -94,4 +96,14 @@ public static void postSessionDoingEvent(final GlobalSession globalSession, Stri
globalSession.getTransactionName(), globalSession.getApplicationId(),
globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(), null, status, retryGlobal, retryBranch));
}
+
+ /**
+ * Post rate limit event.
+ *
+ * @param rateLimitInfo the rate limit info
+ */
+ public static void postRateLimitEvent(RateLimitInfo rateLimitInfo) {
+ EVENT_BUS.post(new RateLimitEvent(rateLimitInfo.getTraceId(), rateLimitInfo.getLimitType(), rateLimitInfo.getApplicationId(),
+ rateLimitInfo.getClientId(), rateLimitInfo.getServerIpAddressAndPort()));
+ }
}
diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java
index 2a937d0b98c..74d6fbe8816 100644
--- a/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java
+++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java
@@ -16,14 +16,10 @@
*/
package org.apache.seata.server.metrics;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-
import com.google.common.eventbus.Subscribe;
import org.apache.seata.core.event.ExceptionEvent;
import org.apache.seata.core.event.GlobalTransactionEvent;
+import org.apache.seata.core.event.RateLimitEvent;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.metrics.Id;
import org.apache.seata.metrics.registry.Registry;
@@ -31,8 +27,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
import static org.apache.seata.metrics.IdConstants.APP_ID_KEY;
+import static org.apache.seata.metrics.IdConstants.CLIENT_ID_KEY;
import static org.apache.seata.metrics.IdConstants.GROUP_KEY;
+import static org.apache.seata.metrics.IdConstants.LIMIT_TYPE_KEY;
+import static org.apache.seata.metrics.IdConstants.HOST_AND_PORT;
import static org.apache.seata.metrics.IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY;
import static org.apache.seata.metrics.IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY;
@@ -193,6 +197,15 @@ public void exceptionEventForMetrics(ExceptionEvent event) {
.withTag(APP_ID_KEY, event.getName())).increase(1);
}
+ @Subscribe
+ public void recordRateLimitEventForMetrics(RateLimitEvent event) {
+ registry.getSummary(MeterIdConstants.SUMMARY_RATE_LIMIT
+ .withTag(LIMIT_TYPE_KEY, event.getLimitType())
+ .withTag(APP_ID_KEY, event.getApplicationId())
+ .withTag(CLIENT_ID_KEY, event.getClientId())
+ .withTag(HOST_AND_PORT, event.getServerIpAddressAndPort())).increase(1);
+ }
+
@Override
public boolean equals(Object obj) {
return this.getClass().getName().equals(obj.getClass().getName());
diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter b/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter
new file mode 100644
index 00000000000..ef355142ff1
--- /dev/null
+++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seata.server.limit.ratelimit.TokenBucketLimiter
\ No newline at end of file
diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml
index 059312ae856..7231b11180c 100644
--- a/server/src/main/resources/application.example.yml
+++ b/server/src/main/resources/application.example.yml
@@ -159,6 +159,11 @@ seata:
session:
branch-async-queue-size: 5000 #branch async remove queue size
enable-branch-async-remove: false #enable to asynchronous remove branchSession
+ ratelimit:
+ enable: false
+ bucketTokenSecondNum: 999999
+ bucketTokenMaxNum: 999999
+ bucketTokenInitialNum: 999999
store:
# support: file 、 db 、 redis 、 raft
mode: file
diff --git a/server/src/main/resources/application.raft.example.yml b/server/src/main/resources/application.raft.example.yml
index 241820a1d21..b674e638476 100644
--- a/server/src/main/resources/application.raft.example.yml
+++ b/server/src/main/resources/application.raft.example.yml
@@ -131,6 +131,11 @@ seata:
session:
branch-async-queue-size: 5000 #branch async remove queue size
enable-branch-async-remove: false #enable to asynchronous remove branchSession
+ ratelimit:
+ enable: false
+ bucketTokenNumPerSecond: 999999
+ bucketTokenMaxNum: 999999
+ bucketTokenInitialNum: 999999
store:
# support: file
mode: raft
diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java
new file mode 100644
index 00000000000..4ef8d0bf4e5
--- /dev/null
+++ b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.ratelimiter;
+
+import org.apache.seata.core.protocol.transaction.GlobalBeginRequest;
+import org.apache.seata.core.rpc.RpcContext;
+import org.apache.seata.server.limit.ratelimit.RateLimiter;
+import org.apache.seata.server.limit.ratelimit.RateLimiterHandler;
+import org.apache.seata.server.limit.ratelimit.TokenBucketLimiter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+
+/**
+ * RateLimiterHandlerTest
+ */
+@SpringBootTest
+public class RateLimiterHandlerTest {
+
+ /**
+ * Logger for TokenBucketLimiterTest
+ **/
+ private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterHandlerTest.class);
+
+ private static RateLimiterHandler rateLimiterHandler;
+
+ @Test
+ public void testHandlePass() {
+ RateLimiter rateLimiter = new TokenBucketLimiter(true, 1,
+ 10, 10);
+ rateLimiterHandler = new RateLimiterHandler(rateLimiter);
+ GlobalBeginRequest request = new GlobalBeginRequest();
+ RpcContext rpcContext = new RpcContext();
+ Assertions.assertThrowsExactly(NullPointerException.class, () -> rateLimiterHandler.handle(request, rpcContext));
+ }
+
+ @Test
+ public void testHandleNotPass() {
+ RateLimiter rateLimiter = new TokenBucketLimiter(true, 1,
+ 1, 0);
+ rateLimiterHandler = new RateLimiterHandler(rateLimiter);
+ GlobalBeginRequest request = new GlobalBeginRequest();
+ RpcContext rpcContext = new RpcContext();
+ Assertions.assertThrowsExactly(NullPointerException.class, () -> rateLimiterHandler.handle(request, rpcContext));
+ }
+
+}
diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java
new file mode 100644
index 00000000000..7f386665ca5
--- /dev/null
+++ b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.ratelimiter;
+
+import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.server.limit.ratelimit.RateLimiter;
+import org.apache.seata.server.limit.ratelimit.TokenBucketLimiter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.util.StopWatch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * TokenBucketLimiterTest
+ */
+@SpringBootTest
+public class TokenBucketLimiterTest {
+
+ /**
+ * Logger for TokenBucketLimiterTest
+ **/
+ private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketLimiterTest.class);
+
+ @Test
+ public void testPerformanceOfTokenBucketLimiter() throws InterruptedException {
+ RateLimiter rateLimiter = new TokenBucketLimiter(true, 1,
+ 10, 10);
+ int threads = 10;
+ final int count = 100;
+ final CountDownLatch cnt = new CountDownLatch(count * threads);
+
+ final ThreadPoolExecutor service1 = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS,
+ new SynchronousQueue(), new NamedThreadFactory("test1", false));
+ AtomicInteger totalPass = new AtomicInteger();
+ AtomicInteger totalReject = new AtomicInteger();
+ StopWatch totalStopWatch = new StopWatch();
+ totalStopWatch.start();
+ for (int i = 0; i < threads; i++) {
+ service1.execute(() -> {
+ int pass = 0;
+ int reject = 0;
+ StopWatch w = new StopWatch();
+ w.start();
+ for (int u = 0; u < count; u++) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ boolean result = rateLimiter.canPass();
+ if (result) {
+ pass++;
+ totalPass.getAndIncrement();
+ } else {
+ reject++;
+ totalReject.getAndIncrement();
+ }
+ cnt.countDown();
+ }
+ w.stop();
+ LOGGER.info("total time:{}ms, pass:{}, reject:{}", w.getLastTaskTimeMillis(), pass, reject);
+ });
+ }
+ cnt.await();
+ totalStopWatch.stop();
+ LOGGER.info("total time:{}ms, total pass:{}, total reject:{}", totalStopWatch.getLastTaskTimeMillis(),
+ totalPass.get(), totalReject.get());
+ Assertions.assertNotEquals(0, totalReject.get());
+ }
+}
diff --git a/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter b/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter
new file mode 100644
index 00000000000..ef355142ff1
--- /dev/null
+++ b/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seata.server.limit.ratelimit.TokenBucketLimiter
\ No newline at end of file
diff --git a/server/src/test/resources/file.conf b/server/src/test/resources/file.conf
index 422c6dd5836..1a03672ddf6 100644
--- a/server/src/test/resources/file.conf
+++ b/server/src/test/resources/file.conf
@@ -56,6 +56,12 @@ server {
#schedule delete expired undo_log in milliseconds
logDeletePeriod = 86400000
}
+ ratelimit {
+ enable = false
+ bucketTokenNumPerSecond = 999999
+ bucketTokenMaxNum = 999999
+ bucketTokenInitialNum = 999999
+ }
}
## metrics settings
metrics {