这里对Java中的阻塞队列及其常见实现进行介绍
楔子
在多线程环境下实现一个线程安全的队列,大体可分为两种思路:基于阻塞机制的、基于非阻塞机制的。后者通过CAS算法等手段以避免发生阻塞,典型地实现有ConcurrentLinkedQueue、ConcurrentLinkedDeque;前者则是通过锁的方式来保证线程安全,其会在队列已满、队列为空时分别阻塞生产者、消费者。具体地,Java中则是提供了一个BlockingQueue阻塞队列接口并提供相应的实现类
BlockingQueue接口
BlockingQueue接口通过继承Queue接口,实现了对传统队列操作方式的补充、增强。新增了阻塞、超时两种形式的队列操作方式。如下表所示
队列操作 |
抛异常 |
返回特殊值 |
阻塞 |
支持超时 |
入队 |
add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
出队 |
remove() |
poll() |
take() |
poll(time, unit) |
查看队首元素 |
element() |
peek() |
N/A |
N/A |
前两种形式(抛异常、返回特殊值)与Queue接口一致,当队列已满添加元素失败时,会分别抛出异常、返回特殊值false;当队列为空时,进行移除元素或查看队首元素时,则会分别抛出异常、返回特殊值null。对于阻塞形式而言,其针对入队、出队操作分别定义了put、take方法。当生产者线程向一个已满队列通过put方法添加元素时,则其自身将会被阻塞直到队列不为满;类似地,对于消费者的task方法而言同理,此处不再赘述。对于支持超时形式而言,其重载了原有的offer、poll方法,增加了对超时参数的支持。最后对于Java阻塞队列来说,即BlockingQueue接口的实现类均不支持null值元素
ArrayBlockingQueue
其是一个基于数组的阻塞队列,底层使用数组进行元素的存储。创建该阻塞队列实例需要指定队列容量,故其是一个有界队列。在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制,换言之生产者线程与消费者线程间无法同时操作
LinkedBlockingQueue
其是一个基于链表的阻塞队列,底层使用链表进行元素的存储。该阻塞队列容量默认为 Integer.MAX_VALUE,即如果未显式设置队列容量时可以视为是一个无界队列;反之构建实例过程中指定队列容量,则其就是一个有界队列。在并发控制层面,其使用了两个ReentrantLock可重入锁来分别控制对入队、出队这两种类型的操作。使得生产者线程与消费者线程间可以同时操作提高效率。特别地对于链表这种结构而言,Java还提供了一个实现BlockingDeque接口的LinkedBlockingDeque类——其是一个基于链表的双向阻塞队列
PriorityBlockingQueue
提到优先级队列,我们会想到PriorityQueue,但其由于不是线程安全的,故无法在多线程环境下使用。为此Java提供了一个线程安全版本的优先级队列PriorityBlockingQueue,其是一个支持优先级的无界阻塞队列。底层使用数组实现元素的存储、最小堆的表示。默认使用元素的自然排序,即要求元素实现Comparable接口;或者显式指定比较器Comparator。在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制。值得一提的是,在创建该队列实例时虽然可以指定容量。但这并不是队列的最终容量,而只是该队列实例的初始容量。一旦后续过程队列容量不足,其会自动进行扩容。值得一提的是,为了保证同时只有一个线程进行扩容,其内部是通过CAS方式来实现的,而不是利用ReentrantLock可重入锁来控制。故PriorityBlockingQueue是一个无界队列。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Test public void test1() { BlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>(2); blockingQueue.offer(13); blockingQueue.offer(5); blockingQueue.offer(7);
Integer size = blockingQueue.size(); System.out.println("blockingQueue: " + blockingQueue + ", size: " + size);
Integer e1 = blockingQueue.poll(); System.out.println("e1: " + e1);
Integer e2 = blockingQueue.poll(); System.out.println("e2: " + e2);
Integer e3 = blockingQueue.poll(); System.out.println("e3: " + e3); }
|
测试结果如下所示
DelayQueue
延迟队列,一个无界的阻塞队列。顾名思义,元素只有到了其指定的延迟时间才能出队,否则消费者线程调用take方法会被一直阻塞。其底层使用PriorityQueue实现元素的存储,使用一个ReentrantLock实现线程同步。该队列中的元素在实现Delayed接口时需要同时实现getDelay、compareTo方法。前者用于计算元素当前剩余的延迟时间;后者用于实现延迟时间按从小到大进行排序,以保证队头元素是延迟时间最小的。这里我们以缓存数据为场景进行实践,当缓存到期后即可被从队列中移除。示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public class BlockingQueueTest {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
@Test public void test2() throws Exception { BlockingQueue<Cache> blockingQueue = new DelayQueue<>();
new Thread(() -> { while (true) { try { Cache cache = blockingQueue.take(); info("消费者: " + cache.toString()); } catch (Exception e) { System.out.println("Happen Exception: " + e.getMessage()); } } }).start();
Long timeStamp = System.currentTimeMillis(); Cache cache1 = new Cache("name", "Aaron", timeStamp + 15 * 1000); blockingQueue.put(cache1);
Cache cache2 = new Cache("age", "18", timeStamp + 27 * 1000); blockingQueue.put(cache2);
Cache cache3 = new Cache("country", "China", timeStamp + 7 * 1000); blockingQueue.put(cache3);
Thread.sleep(120 * 1000); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "[" + time + "] " + msg; System.out.println(log); }
@AllArgsConstructor @Data private static class Cache implements Delayed { private String key;
private String value;
private Long expire;
@Override public long getDelay(TimeUnit unit) { long delta = expire - System.currentTimeMillis(); return unit.convert(delta, TimeUnit.MILLISECONDS); }
@Override public int compareTo(Delayed o) { Cache other = (Cache) o; return this.getExpire().compareTo(other.getExpire()); }
@Override public String toString() { Date time = new Date(expire); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String timeStr = formatter.format(time); return "Cache, key: " + key + ", expire: " + timeStr; } }
}
|
测试结果如下所示
SynchronousQueue
其是一个同步队列。特别地是由于该队列没有容量无法存储元素,故生产者添加的数据会直接被消费者获取并且立刻消费。所以当生产者线程添加数据时,如果此时恰好有一个消费者已经准备好获取队头元素了,则会添加成功;否则要么添加失败返回false要么被阻塞。通过Executors.newCachedThreadPool()创建的线程池实例,其内部任务队列使用的就是SynchronousQueue,故offer方法添加任务到队列失败后则会开启新的线程来进行处理。关于同步队列的这一特性,通过下面的示例可以帮助我们更好的理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Test public void test3() { BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>(); Boolean b1 = blockingQueue.offer(237); info("生产者 b1: " + b1);
new Thread( ()->{ try{ Integer e = blockingQueue.take(); info("消费者:" + e); } catch (Exception e) { info("Happen Exception: " + e.getMessage()); } } ).start();
try { Thread.sleep(2000); } catch (Exception e) {} Boolean b2 = blockingQueue.offer(996); info("生产者 b2: " + b2);
try { Thread.sleep(120*1000); } catch (Exception e) {} }
|
测试结果如下,符合预期。生产者第一次添加元素结果失败,原因很简单。因为同步队列没有存储元素的能力,故如果没有消费者直接取走,则生产者即会添加失败;第二次添加时,消费者线程已经在阻塞等待了,故添加成功
下面我们利用阻塞的put方法来添加元素,示例代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Test public void test4() { BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
new Thread(() -> { try { info("生产者: Start"); while (true) { Integer num = RandomUtil.randomInt(1, 100); info("生产者: put " + num); blockingQueue.put(num); } } catch (Exception e) { info("Happen Exception: " + e.getMessage()); } }).start();
new Thread(() -> { try { info("消费者: Start"); while (true) { try { Thread.sleep(5000); } catch (Exception e) { } Integer e = blockingQueue.take(); info("消费者: " + e); } } catch (Exception e) { info("Happen Exception: " + e.getMessage()); } }).start();
try { Thread.sleep(120 * 1000); } catch (Exception e) {} }
|
从测试结果中的时间戳,可以很明显看出只有当消费者取出元素,生产者线程的put方法才会结束阻塞
参考文献
- Java并发编程之美 翟陆续、薛宾田著