0%

Java多线程之Semaphore信号量

这里就JUC包中的Semaphore类做相关介绍

abstract.jpeg

概述

JUC包中的Semaphore信号量作为一个并发工具类。其基本思想很简单,对于一个信号量实例而言,其含有指定数量的许可。每当访问资源前,需先向其申请许可。并在处理完毕后释放许可,以供后续申请。其实,这个使用方式就很像现实世界的停车场,即停车场有空余车位,车才可以进车;否则要么等待要么离开(寻找下一个停车场)。当车从停车场的车位驶离时,则会将相应的车位就会空余出来。在整个过程停车场的车位资源是有限的固定的。常见的使用场景是对业务所使用的线程数进行控制,即所谓基于线程数的限流方式。其常用方法及功能如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 创建一个指定许可数的非公平信号量
public Semaphore(int permits);

// 创建一个指定许可数的公平/非公平信号量
public Semaphore(int permits, boolean fair);

// 释放一个许可
public void release();

// 释放指定数量的许可
public void release(int permits);

// 当前剩余可用的许可数量
public int availablePermits();

/*************************** 获取许可 ******************************/

// 阻塞等待,直到获取一个许可
public void acquire() throws InterruptedException;

// 阻塞等待,直到获取全部所需数量的许可
public void acquire(int permits) throws InterruptedException;

// 阻塞等待(忽略InterruptedException异常),直到获取一个许可
public void acquireUninterruptibly();

// 阻塞等待(忽略InterruptedException异常),直到获取全部所需数量的许可
public void acquireUninterruptibly(int permits);

// 非阻塞式获取一个许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire();

// 非阻塞式获取全部所需数量的许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(int permits);

// 支持超时机制的tryAcquire方法, 获取一个许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;

// 支持超时机制的tryAcquire方法, 获取全部所需数量的许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException;

// 一次性获取所有剩余可用的许可, 返回成功获取的许可数
public int drainPermits();

/******************************************************************/

可以看到,对于信号量而言,其支持公平和非公平两种类型。默认为非公平的。值得一提的是,对于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的许可,会立即分配给它。不论是否存在其他正在等待许可的线程。即使当前这个信号量实例是公平的,换言之tryAcquire()方法会破坏公平信号量实例的公平性。如果既期望使用非阻塞方式,又期望不破坏公平信号量的公平性,可以使用它的超时机制版本,同时将超时时间设为0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此处不再赘述

基本实践

这里通过一个简单的实例,来进行展示其基本的使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class SemaphoreTest {

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");

// 系统最大的并发处理量
private static Integer maxLimit = 5;

@Test
public void test1() {
System.out.println("---------------------- 系统上线 ----------------------");
Semaphore semaphore = new Semaphore(maxLimit, true);
ExecutorService threadPool = Executors.newFixedThreadPool(10);

IntStream.rangeClosed(1,8)
.mapToObj( num -> new UserReq("用户#"+num, semaphore) )
.forEach( threadPool::execute );

// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}


/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String log = "["+time+"] "+ msg;
System.out.println(log);
}

@AllArgsConstructor
private static class UserReq implements Runnable{

private String name;

private Semaphore semaphore;

@Override
public void run() {
// 模拟用户不定时发起请求
try{ Thread.sleep(RandomUtils.nextLong(500, 2000)); } catch (Exception e) {}
String msg = name + ": 发起请求, 系统可用资源数: " + semaphore.availablePermits();
info(msg);

// 阻塞等待,直到获取许可
try {
semaphore.acquire();
}catch (InterruptedException e) {
System.out.println( "Happen Exception: " + e.getMessage());
}

info(name + ": 系统开始处理请求");
// 模拟业务耗时
try{ Thread.sleep(RandomUtils.nextInt(5, 20)*1000); } catch (Exception e) {}

// 用户请求处理完毕,释放许可
semaphore.release();
info(name + ": 系统处理完毕");
}
}
}

测试结果如下,符合预期

figure 1.jpeg

实现原理

构造器

Semaphore信号量类的实现过程同样依赖于AQS。具体地,其是对AQS中共享锁的使用。在构建Semaphore实例过程时,一方面,通过sync变量持有AQS的实现类Sync,同时按公平性与否进一步地可细分为NonfairSync、FairSync;另一方面,通过AQS的state字段来存储许可的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Semaphore implements java.io.Serializable {

private final Sync sync;

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}

static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}

static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
}

}

acquire方法

首先来看Semaphore的acquire()方法。其委托sync调用AQS的acquireSharedInterruptibly方法。而在AQS中通过调用tryAcquireShared方法判断是否需要阻塞调用线程。具体地,在Semaphore的NonfairSync、FairSync内部类分别实现了该tryAcquireShared方法的两个版本:非公平、公平。可以看到两种实现基本一致。tryAcquireShared如果返回负值,则说明当前许可数不够,当前线程需要进入AQS阻塞队列;反之则获取成功。只是在公平版本的实现中,会调用AQS的hasQueuedPredecessors方法来判断是否有其他线程已经在AQS队列中进行排队。如果有,则tryAcquireShared直接返回-1,即当前调用线程放弃获取,转而准备进入AQS队列以保障公平性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class Semaphore implements java.io.Serializable {

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平信号量获取许可
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

static final class FairSync extends Sync {
// 公平信号量获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
// 对于公平性实现而言, 如果AQS队列存在排队的节点
// 则直接返回-1, 即进入AQS队列进行排队以保证公平性
return -1;
// 通过访问AQS的state字段, 获取当前可用的许可数量
int available = getState();
// 计算剩余可用的许可数量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 线程被中断则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();

if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// 需要子类去实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
}

release方法

Semaphore的release()方法类似。其同样是委托sync调用AQS的releaseShared方法。然后AQS执行tryReleaseShared方法,如果该方法返回true,则会进一步调用AQS的doReleaseShared方法来唤醒AQS队列中其他线程。可以看到在Semaphore的Sync内部类中,tryReleaseShared总是会返回true。其实现过程也很简单,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Semaphore implements java.io.Serializable {

public void release() {
sync.releaseShared(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 通过访问AQS的state字段, 获取当前可用的许可数量
int current = getState();
// 将释放的许可数添加到当前可用许可数量上
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通过CAS的方式更新state字段
if (compareAndSetState(current, next))
return true;
}
}
}

}

...

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

// 需要子类去实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
}

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
请我喝杯咖啡捏~
  • 本文作者: Aaron Zhu
  • 本文链接: https://xyzghio.xyz/Semaphore/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-ND 许可协议。转载请注明出处!

欢迎关注我的微信公众号:青灯抽丝