JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时 Lock 锁的频繁操作。这三种辅助类为:
- CountDownLatch: 减少计数
- CyclicBarrier: 循环栅栏
- Semaphore: 信号灯
一、减少计数 CountDownLatch
1. 概述
该类的构造方法为 CountDownLatch(int count)
构造一个用给定计数初始化的 CountDownLatch
在这里插入代码片
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。具体步骤可以演化为定义一个类,减1操作,并等待到0,为0执行结果。
两个常用的主要方法
- await() 使当前线程在锁存器倒计数至零之前一直在等待,除非线程被中断
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
- countDown()递减锁存器的计数,如果计数达到零,将释放所有等待的线程
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
2. 案例演示
6个同学陆续离开教室之后,班长才能锁门
演示没有CountDownLatch的情况
如果不加 CountDownLatch类,会出现线程混乱执行,同学还未离开教室班长就已经锁门了
public class NoCountDownLatch {
//6个同学陆续离开教室之后,班长锁门
public static void main(String[] args) throws InterruptedException {
// 创建六个线程,模拟六个学生
for (int i = 1; i <=6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"离开教室");
},String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName()+"锁门");
}
}
运行结果:
5离开教室
main锁门
2离开教室
1离开教室
6离开教室
3离开教室
4离开教室
添加CountDownLatch
public class CountDownLatchDemo {
//6个同学陆续离开教室之后,班长锁门
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 号同学离开了教室");
// 计数器减1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 阻塞,等待计数器为0后,继续往下执行
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "执行了关门操作");
}
}
运行结果:
2 号同学离开了教室
5 号同学离开了教室
6 号同学离开了教室
1 号同学离开了教室
4 号同学离开了教室
3 号同学离开了教室
main执行了关门操作
二、循环栅栏 CyclicBarrier
1. 概述
CyclicBarrier 字面意思是环栅栏,是 JUC 下的一个并发工具,跟 CountDownLatch 很相似,都可以使线程先等待然后再执行,但是它的功能比 CountDownLatch 更加复杂和强大, CountDownLatch 是一个或者多个线程等待另外一批线程执行完毕后,在接着执行,而 CyclicBarrier 是等待一批线程到达某个状态之后再同时开始执行,回环的意思是当所有的线程被释放后,CyclicBarrier 可以被重启,也就是可以重复使用。
常用的构造方法有:CyclicBarrier(int parties,Runnable barrierAction) ,其底层代码如下
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
// 最后需要执行的方法
this.barrierCommand = barrierAction;
}
创建一个新的CyclicBarrier,它将在 给定数量的参与者 (线程)处于等待状态时启动,并在启动barrier时执行 给定的屏障操作,该操作由最后一个进入barrier的线程操作
常用的方法有:
await() 在所有的参与者都已经在此barrier上调用await方法之前一直等待。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 同步锁
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 每次执行 CyclicBarrier 一次障碍数会加一,距离目标障碍数-1
int index = --count;
// 当达到目标障碍数执行if内代码
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 执行Runnable代码
command.run();
ranAction = true;
nextGeneration();
// 返回0
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 循环,直到触发、中断、中断或超时
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
从底层代码可知, CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会+1,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。可以将 CyclicBarrier 理解为+1 操作(指与目标障碍数的距离)
2. 案例演示
代码案例:集齐7颗龙珠就可以召唤神龙
public class CyclicBarrirtTest {
private static Integer NUMBER = 7;
public static void main(String[] args) {
// 每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> System.out.println("集齐7颗龙珠,可以召唤神龙了"));
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 号龙珠已收集");
try {
// 执行 await() 方法,障碍数 +1
cyclicBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}, String.valueOf(i)).start();
}
}
}
执行结果
5 号龙珠已收集
1 号龙珠已收集
4 号龙珠已收集
2 号龙珠已收集
6 号龙珠已收集
3 号龙珠已收集
7 号龙珠已收集
集齐7颗龙珠,可以召唤神龙了
三、信号灯 Semaphore
1. 概述
信号量(Semaphore),又被称为信号灯,在多线程环境下用于协调各个线程, 以保证它们能够正确、合理的使用公共资源。信号量维护了一个许可集,我们在初始化Semaphore时需要为这个许可集传入一个数量值,该数量值代表同一时间能访问共享资源的线程数量。
线程可以通过acquire()方法获取到一个许可,然后对共享资源进行操作。注意如果许可集已分配完了,那么线程将进入等待状态,直到其他线程释放许可才有机会再获取许可,线程释放一个许可通过release()方法完成,”许可”将被归还给Semaphore。
具体常用的构造方法有:
Semaphore(int permits) 创建具有给定的许可数和非公平的公平设置的 Semapore
public Semaphore(int permits) {
// 默认创建非公平锁
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
// fair为true时,为公平锁
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
具体常用的方法有:
acquire()从此信号量获取一个许可,在获取到许可前一直将线程阻塞。
release()释放一个许可,将其返回给信号量
public void acquire() throws InterruptedException {
// 阻塞当前线程
sync.acquireSharedInterruptibly(1);
}
public void release() {
// 释放一个许可
sync.releaseShared(1);
}
设置许可数量 Semaphore semaphore = new Semaphore(3);
一般 acquire()都会抛出异常,release 在 finally 中执行
2. 案例演示
6辆汽车,停3个车位
public class SemaphoreTest {
public static void main(String[] args) {
//创建Semaphore,设置许可数量
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 获取许可,在获取到前,线程阻塞
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 找到了车位");
// 设置停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
// 离开车位
System.out.println(Thread.currentThread().getName()+"------离开了车位");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 释放许可
semaphore.release();
}
}, "车辆" + i).start();
}
}
}
执行结果:
车辆1 找到了车位
车辆3 找到了车位
车辆2 找到了车位
车辆1------离开了车位
车辆4 找到了车位
车辆2------离开了车位
车辆5 找到了车位
车辆5------离开了车位
车辆6 找到了车位
车辆3------离开了车位
车辆6------离开了车位
车辆4------离开了车位
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com