title | date | order | categories | tags | permalink | ||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
流量控制 |
2020-01-20 03:06:00 -0800 |
3 |
|
|
/pages/60bb6d/ |
在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。
限流可以认为是服务降级的一种。限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。
固定窗口限流算法的基本策略是:
- 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
- 为每个固定时间窗口设置一个计数器,用于统计请求数;
- 一旦请求数超过最大请求数,则请求会被拦截。
固定窗口限流算法的优点是:实现简单。
固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。
【示例】Java 版本的固定窗口限流算法
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class SlidingWindowRateLimiter implements RateLimiter {
/**
* 允许的最大请求数
*/
private final long maxPermits;
/**
* 窗口期时长
*/
private long periodMillis;
/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;
/**
* 窗口期截止时间
*/
private long lastPeriodMillis;
/**
* 分片窗口数
*/
private final int shardNum;
/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);
/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();
public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}
public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}
@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}
}
滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。
滑动窗口限流算法的基本策略是:
- 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
- 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
- 要保证所有子窗口的统计数之和不能超过阈值。
滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。
滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。
滑动窗口限流算法的缺点是:
- 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
- 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。
【示例】Java 版本的滑动窗口限流算法
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class SlidingWindowRateLimiter implements RateLimiter {
/**
* 允许的最大请求数
*/
private final long maxPermits;
/**
* 窗口期时长
*/
private final long periodMillis;
/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;
/**
* 窗口期截止时间
*/
private long lastPeriodMillis;
/**
* 分片窗口数
*/
private final int shardNum;
/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);
/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();
public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}
public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}
@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}
}
漏桶限流算法的基本策略是:
- 水(请求)以任意速率由入口进入到漏桶中;
- 水以固定的速率由出口出水(请求通过);
- 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。
漏桶限流算法的优点是:消费速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。
漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。
漏桶策略适用于间隔性突发流量且流量不用即时处理的场景。
【示例】Java 版本的漏桶限流算法
import java.util.concurrent.atomic.AtomicLong;
public class LeakyBucketRateLimiter implements RateLimiter {
/**
* QPS
*/
private final int qps;
/**
* 桶的容量
*/
private final long capacity;
/**
* 计算的起始时间
*/
private long beginTimeMillis;
/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);
public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}
@Override
public synchronized boolean tryAcquire(int permits) {
// 如果桶中没有水,直接通过
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}
// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));
// 重置时间
beginTimeMillis = System.currentTimeMillis();
if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}
}
令牌桶算法的原理:
- 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
- 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
- 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理
因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。
规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。
令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景。
【示例】Java 实现令牌桶算法
import java.util.concurrent.atomic.AtomicLong;
/**
* 令牌桶限流算法
*
* @author <a href="mailto:[email protected]">Zhang Peng</a>
* @date 2024-01-18
*/
public class TokenBucketRateLimiter implements RateLimiter {
/**
* QPS
*/
private final long qps;
/**
* 桶的容量
*/
private final long capacity;
/**
* 上一次令牌发放时间
*/
private long endTimeMillis;
/**
* 桶中当前的令牌数量
*/
private final AtomicLong tokenNum = new AtomicLong(0);
public TokenBucketRateLimiter(long qps, long capacity) {
this.qps = qps;
this.capacity = capacity;
this.endTimeMillis = System.currentTimeMillis();
}
@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
long gap = now - endTimeMillis;
// 计算令牌数
long newTokenNum = (gap * qps / 1000);
long currentTokenNum = tokenNum.get() + newTokenNum;
tokenNum.set(Math.min(capacity, currentTokenNum));
if (tokenNum.get() < permits) {
return false;
} else {
tokenNum.addAndGet(-permits);
endTimeMillis = now;
return true;
}
}
}
扩展
Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class RateLimiterDemo {
public static void main(String[] args) {
// ============================================================================
int qps = 20;
System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);
System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);
System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);
System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}
private static void testRateLimit(RateLimiter rateLimiter, int qps) {
AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();
int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}
try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}
private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}
}
前文中,基于 Java 实现的限流算法示例只能运行在单节点,无法有效应对集群部署的服务,这中场景下就需要分布式限流。
实现分布式限流的一种简单解决方案是使用 Redis + Lua 来实现。使用二者来开发的原因是:1. Redis 的性能极高;2. Redis 支持以原子操作的方式执行 Lua 脚本。
Redis + Lua 实现的固定窗口限流算法实现思路:
- 根据实际需要,将当前时间格式化为天(
yyyyMMdd
)、时(yyyyMMddHH
)、分(yyyyMMddHHmm
)、秒(yyyyMMddHHmmss
),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量; - 用于代表不同粒度的时间窗口按需设置过期时间;
- 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。
【示例】Redis + Lua 实现的固定窗口限流算法
下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。
private final String key = "rate:limit:202401222100";
private final int limit = 100;
private final int seconds = 60;
public boolean tryAcquire(int permits) {
// -- 缓存 Key
// local key = KEYS[1]
// -- 访问请求数
// local permits = tonumber(ARGV[1])
// -- 过期时间
// local seconds = tonumber(ARGV[2])
// -- 限流阈值
// local limit = tonumber(ARGV[3])
//
// -- 获取统计值
// local count = tonumber(redis.call('GET', key) or "0")
//
// if count + permits > limit then
// -- 触发限流
// return 0
// else
// redis.call('INCRBY', key, permits)
// redis.call('EXPIRE', key, seconds)
// return count + permits
// end
String script =
"-- 缓存 Key\n"
+ "local key = KEYS[1]\n"
+ "-- 访问请求数\n"
+ "local permits = tonumber(ARGV[1])\n"
+ "-- 过期时间\n"
+ "local seconds = tonumber(ARGV[2])\n"
+ "-- 限流阈值\n"
+ "local limit = tonumber(ARGV[3])\n"
+ "\n"
+ "-- 获取统计值\n"
+ "local count = tonumber(redis.call('GET', key) or \"0\")\n"
+ "\n"
+ "if count + permits > limit then\n"
+ " -- 触发限流\n"
+ " return 0\n"
+ "else\n"
+ " redis.call('INCRBY', key, permits)\n"
+ " redis.call('EXPIRE', key, seconds)\n"
+ " return count + permits\n"
+ "end";
List<String> keys = Collections.singletonList(key);
List<String> args = Arrays.asList(String.valueOf(permits), String.valueOf(seconds), String.valueOf(limit));
Object eval = jedis.eval(script, keys, args);
long value = (long) eval;
return value != 0;
}
@Test
public void test() {
for (int i = 0; i < 11; i++) {
if (tryAcquire(10)) {
System.out.println("请求成功");
} else {
System.out.println("请求失败");
}
}
}
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求失败
// rate:limit:202401222100 统计值达到 100
【示例】基于 Redis Lua 令牌桶限流算法实现
-- 令牌桶限流
-- 令牌的唯一标识
local bucketKey = KEYS[1]
-- 上次请求的时间
local last_mill_request_key = KEYS[2]
-- 令牌桶的容量
local limit = tonumber(ARGV[1])
-- 请求令牌的数量
local permits = tonumber(ARGV[2])
-- 令牌流入的速率
local rate = tonumber(ARGV[3])
-- 当前时间
local curr_mill_time = tonumber(ARGV[4])
-- 添加令牌
-- 获取当前令牌的数量
local current_limit = tonumber(redis.call('get', bucketKey) or "0")
-- 获取上次请求的时间
local last_mill_request_time = tonumber(redis.call('get', last_mill_request_key) or "0")
-- 计算向桶里添加令牌的数量
if last_mill_request_time == 0 then
-- 令牌桶初始化
-- 更新上次请求时间
redis.call("HSET", last_mill_request_key, curr_mill_time)
return 0
else
local add_token_num = math.floor((curr_mill_time - last_mill_request_time) * rate)
end
-- 更新令牌的数量
if current_limit + add_token_num > limit then
current_limit = limit
else
current_limit = current_limit + add_token_num
end
redis.pcall("HSET",bucketKey, current_limit)
-- 设置过期时间
redis.call("EXPIRE", bucketKey, 2)
-- 限流判断
if current_limit - permits < 1 then
-- 达到限流大小
return 0
else
-- 没有达到限流大小
current_limit = current_limit - permits
redis.pcall("HSET", bucketKey, current_limit)
-- 设置过期时间
redis.call("EXPIRE", bucketKey, 2)
-- 更新上次请求的时间
redis.call("HSET", last_mill_request_key, curr_mill_time)
end
前面介绍了限流算法的基本原理和一些简单的实现。但在生产环境,我们一般应该使用更成熟的限流工具。
-
Guava 的
RateLimiter
:RateLimiter 基于漏桶算法,但它参考了令牌桶算法。具体用法可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法 - Hystrix:经典的限流、熔断工具,很值得借鉴学习。注:官方已停止发布版本。
- Sentinel:阿里的限流、熔断工具。