本文介绍几种常见的限流算法及其在Java下的实现方式
计数器 固定窗口计数器 为了实现流量控制,一个朴素的想法是直接对时间窗口内的请求数量进行统计。如果未达到阈值,则放行请求;反之则对请求进行拒绝。当该时间窗口过去后,则直接将计数器清零。即所谓的固定窗口计数器。该算法的示意图如下所示,可以看到在时间窗口3中,由于计数器值为5已经达到阈值,故对于第6个请求进行限流
而在具体实现过程中,可通过定时线程在到达一个新时间窗口时将计数器清零。Java实现如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class FixedWindowCounterLimiter { private int windowSize; private int limit; private AtomicInteger count; public FixedWindowCounterLimiter (int windowSize, int limit) { this .windowSize = windowSize; this .limit = limit; count = new AtomicInteger (0 ); new Thread ( () -> { while (true ) { try { Thread.sleep(windowSize * 1000 ); }catch (InterruptedException e) { System.out.println("Happen Exception: " +e.getMessage()); } count.set(0 ); } }).start(); } public boolean tryAcquire () { int num = count.incrementAndGet(); if ( num > limit ) { return false ; } else { return true ; } } }
测试代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Test public void test1 () throws Exception { FixedWindowCounterLimiter rateLimiter = new FixedWindowCounterLimiter (2 , 5 ); int allNum = 3 ; int passNum = 0 ; int blockNum = 0 ; for (int i=0 ; i<allNum; i++){ if (rateLimiter.tryAcquire()){ passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); Thread.sleep(5000 ); allNum = 14 ; passNum = 0 ; blockNum = 0 ; for (int i=0 ; i<allNum; i++){ if (rateLimiter.tryAcquire()){ passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); }
测试结果如下,符合预期
滑动窗口计数器 在固定窗口计数器中存在一个明显的缺陷——临界问题。即分别对于时间窗口1、时间窗口2来说,其窗口内的请求数量均未超过阈值。但在下图红框部分的时间范围内,其放行的请求数量却超过了阈值要求
而这里的滑动窗口计数器即是对固定窗口计数器的改良。具体地,其在固定时间窗口的基础上,将其划分若干个小窗口,在各小窗口中对请求数分别进行统计。整个时间窗口随着时间的流逝,不断丢弃过期的小窗口,并将统计结果放在新的小窗口,这也是其被称为滑动窗口的由来。最后根据所有小窗口的统计值之和,来判断是放行请求还是进行限流。算法示意图如下所示,可以看到其在一个限流窗口时间范围内划分为3个子窗口。当子窗口数量越多,则子窗口的时间粒度也就越小,进而统计精度也就越高
在时间窗口刚刚开始滑动时,由于子窗口还未被完全填充。故为便于实现,推荐将对当前的统计计数值始终放在数组最后一个元素中,如上图的T1、T2时刻。具体实现如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class SlidingWindowCounterLimiter { private int windowSize; private int slotNum; private int slotTime; private int limit; private int [] counters; private long lastTime; public SlidingWindowCounterLimiter (int windowSize, int limit) { this (windowSize, limit, 10 ); } public SlidingWindowCounterLimiter (int windowSize, int limit, int slotNum) { this .windowSize = windowSize; this .limit = limit; this .slotNum = slotNum; this .counters = new int [slotNum]; this .slotTime = windowSize * 1000 / slotNum; this .lastTime = System.currentTimeMillis(); } public synchronized boolean tryAcquire () { long currentTime = System.currentTimeMillis(); int slideNum = (int ) Math.floor( (currentTime-lastTime)/slotTime ); slideWindow(slideNum); Integer sum = Arrays.stream(counters).sum(); if ( sum > limit ) { return false ; } else { counters[slotNum-1 ]++; return true ; } } private void slideWindow (int num) { if ( num == 0 ) { return ; } if ( num >= slotNum ) { Arrays.fill(counters, 0 ); } else { for (int index=num; index<slotNum; index++) { int newIndex = index - num; counters[newIndex] = counters[index]; counters[index] = 0 ; } } lastTime = lastTime + num * slotTime; } }
测试代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Test public void test2 () throws Exception { SlidingWindowCounterLimiter rateLimiter = new SlidingWindowCounterLimiter (2 , 5 ); int allNum = 3 ; int passNum = 0 ; int blockNum = 0 ; for (int i=0 ; i<allNum; i++){ if (rateLimiter.tryAcquire()){ passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); Thread.sleep(5000 ); allNum = 14 ; passNum = 0 ; blockNum = 0 ; for (int i=0 ; i<allNum; i++){ if (rateLimiter.tryAcquire()){ passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); }
测试结果如下,符合预期
Leaky Bucket 漏桶 As a Meter Version 所谓Leaky Bucket漏桶,指的是向桶中以任意速率注水,而由于该桶底部有一个漏洞,其会以固定的速率流出水。显然如果注水速率过快,桶中水量超过桶容量时即会导致水溢出。而将这一原理应用于限流领域时,不断涌来的请求即相当于向桶中注水,如果桶中水量未超过桶容量,则放行请求;反之则对请求限流。而桶固定的漏水速率则可以理解为系统处理请求流量的能力
具体实现如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class LeakyBucketLimiter1 { private long capacity; private long rate; private long water; private long lastTime; public LeakyBucketLimiter1 (long capacity, long rate) { this .capacity = capacity; this .rate = rate; this .water = 0 ; this .lastTime = System.currentTimeMillis(); } public synchronized boolean tryAcquire () { long currentTime = System.currentTimeMillis(); long outWater = (currentTime-lastTime)/1000 * rate; water = Math.max(0 , water-outWater); lastTime = currentTime; if ( water<capacity ) { water++; return true ; }else { return false ; } } }
测试代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Test public void test3 () throws Exception { LeakyBucketLimiter1 rateLimiter = new LeakyBucketLimiter1 (5 , 1 ); int allNum = 3 ; int passNum = 0 ; int blockNum = 0 ; for (int i=0 ; i<allNum; i++) { if ( rateLimiter.tryAcquire() ) { passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); Thread.sleep(8 *1000 ); allNum = 22 ; passNum = 0 ; blockNum = 0 ; for (int i=0 ; i<allNum; i++) { if ( rateLimiter.tryAcquire() ) { passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); }
测试结果如下,符合预期
As a Queue Version 在As a Meter Version版本的漏桶中,当桶中水未满,请求即会直接被放行。而在漏桶的另外一个版本As a Queue Version中,如果桶中水未满,则该请求将会被暂时存储在桶中。然后以漏桶固定的出水速率对桶中存储的请求依次放行。对比两个版本的漏桶算法不难看出,As a Meter Version版本的漏桶算法可以应对、处理突发流量,只要桶中尚有足够空余即可立即放行请求;而对于As a Queue Version版本的漏桶,其只会以固定速率放行请求,无法充分利用后续系统的处理能力
具体实现如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class LeakyBucketLimiter2 { private ScheduledExecutorService threadPool; private ArrayBlockingQueue<UserRequest> queue; private int capacity; private long rate; public LeakyBucketLimiter2 (int capacity, long rate) { this .capacity = capacity; this .rate = rate; queue = new ArrayBlockingQueue <>( capacity ); threadPool = Executors.newSingleThreadScheduledExecutor(); long period = 1000 / rate; threadPool.scheduleAtFixedRate( getTask() ,0 , period, TimeUnit.MILLISECONDS); } public boolean tryAcquire (UserRequest userRequest) { return queue.offer(userRequest); } private Runnable getTask () { return () -> { UserRequest userRequest = queue.poll(); if ( userRequest!=null ) { userRequest.handle(); } }; } @AllArgsConstructor public static class UserRequest { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS" ); private String name; public void handle () { String timeStr = formatter.format( LocalTime.now() ); String msg = "<" +timeStr+"> " +name+" 开始处理" ; System.out.println(msg); } } }
测试代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Test public void test4 () throws Exception { LeakyBucketLimiter2 rateLimiter = new LeakyBucketLimiter2 (5 , 2 ); int allNum = 7 ; int passNum = 0 ; int blockNum = 0 ; for (int i=1 ; i<=allNum; i++) { String name = "用户请求:" + i; LeakyBucketLimiter2.UserRequest userRequest = new LeakyBucketLimiter2 .UserRequest(name); if ( rateLimiter.tryAcquire( userRequest ) ) { passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); Thread.sleep(120 *1000 ); }
测试结果如下,符合预期
Token Bucket令牌桶 Token Bucket令牌桶的基本原理其实并不复杂,我们以固定速率发放令牌到令牌桶,直到达到桶的容量为止。请求每次会先到令牌桶中获取令牌,如果桶中尚有令牌、获取成功则放行请求;反之,则对请求进行限流。事实上,可以看到Token Bucket令牌桶与As a Meter Version版本的漏桶,其实是一体两面的。前者负责消耗令牌,后者负责注水。本质上是相同的,只不过是思维角度不一样。具体实现如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class TokenBucketLimiter { private long capacity; private long rate; private long tokens; private long lastTime; public TokenBucketLimiter (long capacity, long rate) { this .capacity = capacity; this .rate = rate; this .tokens = capacity; this .lastTime = System.currentTimeMillis(); } public synchronized boolean tryAcquire () { long currentTime = System.currentTimeMillis(); long newTokenNum = (currentTime-lastTime)/1000 * rate; tokens = Math.min(capacity, tokens+newTokenNum); lastTime = currentTime; if ( tokens > 0 ) { tokens--; return true ; }else { return false ; } } }
测试代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Test public void test5 () throws Exception { TokenBucketLimiter rateLimiter = new TokenBucketLimiter (5 , 1 ); int allNum = 3 ; int passNum = 0 ; int blockNum = 0 ; for (int i=0 ; i<allNum; i++) { if ( rateLimiter.tryAcquire() ) { passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); Thread.sleep(8 *1000 ); allNum = 22 ; passNum = 0 ; blockNum = 0 ; for (int i=0 ; i<allNum; i++) { if ( rateLimiter.tryAcquire() ) { passNum++; }else { blockNum++; } } System.out.println("请求总数: " +allNum+", 通过数: " +passNum+", 被限流数: " +blockNum); }
测试结果如下,符合预期
参考文献
凤凰架构 周志明著