这里就JUC包中的Semaphore类做相关介绍
概述
JUC包中的Semaphore信号量作为一个并发工具类。其基本思想很简单,对于一个信号量实例而言,其含有指定数量的许可。每当访问资源前,需先向其申请许可。并在处理完毕后释放许可,以供后续申请。其实,这个使用方式就很像现实世界的停车场,即停车场有空余车位,车才可以进车;否则要么等待要么离开(寻找下一个停车场)。当车从停车场的车位驶离时,则会将相应的车位就会空余出来。在整个过程停车场的车位资源是有限的固定的。常见的使用场景是对业务所使用的线程数进行控制,即所谓基于线程数的限流方式。其常用方法及功能如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public Semaphore(int permits);
public Semaphore(int permits, boolean fair);
public void release();
public void release(int permits);
public int availablePermits();
public void acquire() throws InterruptedException;
public void acquire(int permits) throws InterruptedException;
public void acquireUninterruptibly();
public void acquireUninterruptibly(int permits);
public boolean tryAcquire();
public boolean tryAcquire(int permits);
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException;
public int drainPermits();
|
可以看到,对于信号量而言,其支持公平和非公平两种类型。默认为非公平的。值得一提的是,对于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的许可,会立即分配给它。不论是否存在其他正在等待许可的线程。即使当前这个信号量实例是公平的,换言之tryAcquire()方法会破坏公平信号量实例的公平性。如果既期望使用非阻塞方式,又期望不破坏公平信号量的公平性,可以使用它的超时机制版本,同时将超时时间设为0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此处不再赘述
基本实践
这里通过一个简单的实例,来进行展示其基本的使用流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class SemaphoreTest {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
private static Integer maxLimit = 5;
@Test public void test1() { System.out.println("---------------------- 系统上线 ----------------------"); Semaphore semaphore = new Semaphore(maxLimit, true); ExecutorService threadPool = Executors.newFixedThreadPool(10);
IntStream.rangeClosed(1,8) .mapToObj( num -> new UserReq("用户#"+num, semaphore) ) .forEach( threadPool::execute );
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {} System.out.println("---------------------- 系统下线 ----------------------"); }
private static void info(String msg) { String time = formatter.format(LocalTime.now()); String log = "["+time+"] "+ msg; System.out.println(log); }
@AllArgsConstructor private static class UserReq implements Runnable{
private String name;
private Semaphore semaphore;
@Override public void run() { try{ Thread.sleep(RandomUtils.nextLong(500, 2000)); } catch (Exception e) {} String msg = name + ": 发起请求, 系统可用资源数: " + semaphore.availablePermits(); info(msg);
try { semaphore.acquire(); }catch (InterruptedException e) { System.out.println( "Happen Exception: " + e.getMessage()); }
info(name + ": 系统开始处理请求"); try{ Thread.sleep(RandomUtils.nextInt(5, 20)*1000); } catch (Exception e) {}
semaphore.release(); info(name + ": 系统处理完毕"); } } }
|
测试结果如下,符合预期
实现原理
构造器
Semaphore信号量类的实现过程同样依赖于AQS。具体地,其是对AQS中共享锁的使用。在构建Semaphore实例过程时,一方面,通过sync变量持有AQS的实现类Sync,同时按公平性与否进一步地可细分为NonfairSync、FairSync;另一方面,通过AQS的state字段来存储许可的数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Semaphore implements java.io.Serializable {
private final Sync sync;
public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } }
static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } }
static final class FairSync extends Sync { FairSync(int permits) { super(permits); } }
}
|
acquire方法
首先来看Semaphore的acquire()方法。其委托sync调用AQS的acquireSharedInterruptibly方法。而在AQS中通过调用tryAcquireShared方法判断是否需要阻塞调用线程。具体地,在Semaphore的NonfairSync、FairSync内部类分别实现了该tryAcquireShared方法的两个版本:非公平、公平。可以看到两种实现基本一致。tryAcquireShared如果返回负值,则说明当前许可数不够,当前线程需要进入AQS阻塞队列;反之则获取成功。只是在公平版本的实现中,会调用AQS的hasQueuedPredecessors方法来判断是否有其他线程已经在AQS队列中进行排队。如果有,则tryAcquireShared直接返回-1,即当前调用线程放弃获取,转而准备进入AQS队列以保障公平性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class Semaphore implements java.io.Serializable {
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
static final class NonfairSync extends Sync { protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
static final class FairSync extends Sync { protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } }
|
release方法
Semaphore的release()方法类似。其同样是委托sync调用AQS的releaseShared方法。然后AQS执行tryReleaseShared方法,如果该方法返回true,则会进一步调用AQS的doReleaseShared方法来唤醒AQS队列中其他线程。可以看到在Semaphore的Sync内部类中,tryReleaseShared总是会返回true。其实现过程也很简单,如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class Semaphore implements java.io.Serializable {
public void release() { sync.releaseShared(1); }
abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } }
}
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } }
|
参考文献
- Java并发编程之美 翟陆续、薛宾田著