0%

Java多线程之阻塞队列

这里对Java中的阻塞队列及其常见实现进行介绍

 abstract.jpeg

楔子

在多线程环境下实现一个线程安全的队列,大体可分为两种思路:基于阻塞机制的、基于非阻塞机制的。后者通过CAS算法等手段以避免发生阻塞,典型地实现有ConcurrentLinkedQueue、ConcurrentLinkedDeque;前者则是通过锁的方式来保证线程安全,其会在队列已满、队列为空时分别阻塞生产者、消费者。具体地,Java中则是提供了一个BlockingQueue阻塞队列接口并提供相应的实现类

BlockingQueue接口

BlockingQueue接口通过继承Queue接口,实现了对传统队列操作方式的补充、增强。新增了阻塞、超时两种形式的队列操作方式。如下表所示

队列操作 抛异常 返回特殊值 阻塞 支持超时
入队 add(e) offer(e) put(e) offer(e, time, unit)
出队 remove() poll() take() poll(time, unit)
查看队首元素 element() peek() N/A N/A

前两种形式(抛异常、返回特殊值)与Queue接口一致,当队列已满添加元素失败时,会分别抛出异常、返回特殊值false;当队列为空时,进行移除元素或查看队首元素时,则会分别抛出异常、返回特殊值null。对于阻塞形式而言,其针对入队、出队操作分别定义了put、take方法。当生产者线程向一个已满队列通过put方法添加元素时,则其自身将会被阻塞直到队列不为满;类似地,对于消费者的task方法而言同理,此处不再赘述。对于支持超时形式而言,其重载了原有的offer、poll方法,增加了对超时参数的支持。最后对于Java阻塞队列来说,即BlockingQueue接口的实现类均不支持null值元素

ArrayBlockingQueue

其是一个基于数组的阻塞队列,底层使用数组进行元素的存储。创建该阻塞队列实例需要指定队列容量,故其是一个有界队列。在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制,换言之生产者线程与消费者线程间无法同时操作

LinkedBlockingQueue

其是一个基于链表的阻塞队列,底层使用链表进行元素的存储。该阻塞队列容量默认为 Integer.MAX_VALUE,即如果未显式设置队列容量时可以视为是一个无界队列;反之构建实例过程中指定队列容量,则其就是一个有界队列。在并发控制层面,其使用了两个ReentrantLock可重入锁来分别控制对入队、出队这两种类型的操作。使得生产者线程与消费者线程间可以同时操作提高效率。特别地对于链表这种结构而言,Java还提供了一个实现BlockingDeque接口的LinkedBlockingDeque类——其是一个基于链表的双向阻塞队列

PriorityBlockingQueue

提到优先级队列,我们会想到PriorityQueue,但其由于不是线程安全的,故无法在多线程环境下使用。为此Java提供了一个线程安全版本的优先级队列PriorityBlockingQueue,其是一个支持优先级的无界阻塞队列。底层使用数组实现元素的存储、最小堆的表示。默认使用元素的自然排序,即要求元素实现Comparable接口;或者显式指定比较器Comparator。在并发控制层面,无论是入队还是出队操作,均使用同一个ReentrantLock可重入锁进行控制。值得一提的是,在创建该队列实例时虽然可以指定容量。但这并不是队列的最终容量,而只是该队列实例的初始容量。一旦后续过程队列容量不足,其会自动进行扩容。值得一提的是,为了保证同时只有一个线程进行扩容,其内部是通过CAS方式来实现的,而不是利用ReentrantLock可重入锁来控制。故PriorityBlockingQueue是一个无界队列。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void test1() {
BlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>(2);
blockingQueue.offer(13);
blockingQueue.offer(5);
blockingQueue.offer(7);

Integer size = blockingQueue.size();
System.out.println("blockingQueue: " + blockingQueue + ", size: " + size);

Integer e1 = blockingQueue.poll();
System.out.println("e1: " + e1);

Integer e2 = blockingQueue.poll();
System.out.println("e2: " + e2);

Integer e3 = blockingQueue.poll();
System.out.println("e3: " + e3);
}

测试结果如下所示

figure 1.jpeg

DelayQueue

延迟队列,一个无界的阻塞队列。顾名思义,元素只有到了其指定的延迟时间才能出队,否则消费者线程调用take方法会被一直阻塞。其底层使用PriorityQueue实现元素的存储,使用一个ReentrantLock实现线程同步。该队列中的元素在实现Delayed接口时需要同时实现getDelay、compareTo方法。前者用于计算元素当前剩余的延迟时间;后者用于实现延迟时间按从小到大进行排序,以保证队头元素是延迟时间最小的。这里我们以缓存数据为场景进行实践,当缓存到期后即可被从队列中移除。示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class BlockingQueueTest {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");

@Test
public void test2() throws Exception {
BlockingQueue<Cache> blockingQueue = new DelayQueue<>();

new Thread(() -> {
while (true) {
try {
Cache cache = blockingQueue.take();
info("消费者: " + cache.toString());
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
}).start();

Long timeStamp = System.currentTimeMillis();
Cache cache1 = new Cache("name", "Aaron", timeStamp + 15 * 1000);
blockingQueue.put(cache1);

Cache cache2 = new Cache("age", "18", timeStamp + 27 * 1000);
blockingQueue.put(cache2);

Cache cache3 = new Cache("country", "China", timeStamp + 7 * 1000);
blockingQueue.put(cache3);

Thread.sleep(120 * 1000);
}

/**
* 打印信息
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "[" + time + "] " + msg;
System.out.println(log);
}

@AllArgsConstructor
@Data
private static class Cache implements Delayed {
// 缓存 Key
private String key;

// 缓存 Value
private String value;

// 缓存到期时间
private Long expire;

/**
* 计算当前延迟时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 缓存有效的剩余毫秒数
long delta = expire - System.currentTimeMillis();
return unit.convert(delta, TimeUnit.MILLISECONDS);
}

/**
* 定义比较规则, 延迟时间按从小到大进行排序
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
Cache other = (Cache) o;
return this.getExpire().compareTo(other.getExpire());
}

@Override
public String toString() {
Date time = new Date(expire);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String timeStr = formatter.format(time);
return "Cache, key: " + key + ", expire: " + timeStr;
}
}

}

测试结果如下所示

figure 2.jpeg

SynchronousQueue

其是一个同步队列。特别地是由于该队列没有容量无法存储元素,故生产者添加的数据会直接被消费者获取并且立刻消费。所以当生产者线程添加数据时,如果此时恰好有一个消费者已经准备好获取队头元素了,则会添加成功;否则要么添加失败返回false要么被阻塞。通过Executors.newCachedThreadPool()创建的线程池实例,其内部任务队列使用的就是SynchronousQueue,故offer方法添加任务到队列失败后则会开启新的线程来进行处理。关于同步队列的这一特性,通过下面的示例可以帮助我们更好的理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void test3() {
BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
Boolean b1 = blockingQueue.offer(237);
info("生产者 b1: " + b1);

// 消费者线程
new Thread( ()->{
try{
Integer e = blockingQueue.take();
info("消费者:" + e);
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
}
} ).start();

// 确保消费者线程已经准备完毕
try { Thread.sleep(2000); } catch (Exception e) {}
Boolean b2 = blockingQueue.offer(996);
info("生产者 b2: " + b2);

try { Thread.sleep(120*1000); } catch (Exception e) {}
}

测试结果如下,符合预期。生产者第一次添加元素结果失败,原因很简单。因为同步队列没有存储元素的能力,故如果没有消费者直接取走,则生产者即会添加失败;第二次添加时,消费者线程已经在阻塞等待了,故添加成功

figure 3.jpeg

下面我们利用阻塞的put方法来添加元素,示例代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Test
public void test4() {
BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();

// 生产者线程
new Thread(() -> {
try {
info("生产者: Start");
while (true) {
Integer num = RandomUtil.randomInt(1, 100);
info("生产者: put " + num);
blockingQueue.put(num);
}
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
}
}).start();

// 消费者线程
new Thread(() -> {
try {
info("消费者: Start");
while (true) {
try {
Thread.sleep(5000);
} catch (Exception e) {
}
Integer e = blockingQueue.take();
info("消费者: " + e);
}
} catch (Exception e) {
info("Happen Exception: " + e.getMessage());
}
}).start();

try { Thread.sleep(120 * 1000); } catch (Exception e) {}
}

从测试结果中的时间戳,可以很明显看出只有当消费者取出元素,生产者线程的put方法才会结束阻塞

figure 4.jpeg

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝