07-JUC辅助类CountDownLatch、CyclicBarrier、Semaphore

  1. 一、减少计数 CountDownLatch
    1. 1. 概述
    2. 2. 案例演示
  2. 二、循环栅栏 CyclicBarrier
    1. 1. 概述
    2. 2. 案例演示
  3. 三、信号灯 Semaphore
    1. 1. 概述
    2. 2. 案例演示

JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时 Lock 锁的频繁操作。这三种辅助类为:

  • CountDownLatch: 减少计数
  • CyclicBarrier: 循环栅栏
  • Semaphore: 信号灯

一、减少计数 CountDownLatch

1. 概述

该类的构造方法为 CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch

在这里插入代码片

/**
 * Constructs a {@code CountDownLatch} initialized with the given count.
 *
 * @param count the number of times {@link #countDown} must be invoked
 *        before threads can pass through {@link #await}
 * @throws IllegalArgumentException if {@code count} is negative
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。具体步骤可以演化为定义一个类,减1操作,并等待到0,为0执行结果

两个常用的主要方法

  • await() 使当前线程在锁存器倒计数至零之前一直在等待,除非线程被中断
/**
 * Causes the current thread to wait until the latch has counted down to
 * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
 *
 * <p>If the current count is zero then this method returns immediately.
 *
 * <p>If the current count is greater than zero then the current
 * thread becomes disabled for thread scheduling purposes and lies
 * dormant until one of two things happen:
 * <ul>
 * <li>The count reaches zero due to invocations of the
 * {@link #countDown} method; or
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread.
 * </ul>
 *
 * <p>If the current thread:
 * <ul>
 * <li>has its interrupted status set on entry to this method; or
 * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
 * </ul>
 * then {@link InterruptedException} is thrown and the current thread's
 * interrupted status is cleared.
 *
 * @throws InterruptedException if the current thread is interrupted
 *         while waiting
 */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
  • countDown()递减锁存器的计数,如果计数达到零,将释放所有等待的线程
/**
 * Decrements the count of the latch, releasing all waiting threads if
 * the count reaches zero.
 *
 * <p>If the current count is greater than zero then it is decremented.
 * If the new count is zero then all waiting threads are re-enabled for
 * thread scheduling purposes.
 *
 * <p>If the current count equals zero then nothing happens.
 */
public void countDown() {
    sync.releaseShared(1);
}


/**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}

2. 案例演示

6个同学陆续离开教室之后,班长才能锁门

演示没有CountDownLatch的情况

如果不加 CountDownLatch类,会出现线程混乱执行,同学还未离开教室班长就已经锁门了

public class NoCountDownLatch {
    //6个同学陆续离开教室之后,班长锁门
    public static void main(String[] args) throws InterruptedException {
        // 创建六个线程,模拟六个学生
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"离开教室");
            },String.valueOf(i)).start();
        }
        System.out.println(Thread.currentThread().getName()+"锁门");
    }
}

运行结果:

5离开教室
main锁门
2离开教室
1离开教室
6离开教室
3离开教室
4离开教室

添加CountDownLatch

public class CountDownLatchDemo {
    //6个同学陆续离开教室之后,班长锁门
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 号同学离开了教室");

                // 计数器减1
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }

        // 阻塞,等待计数器为0后,继续往下执行
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "执行了关门操作");
    }
}

运行结果:

2 号同学离开了教室
5 号同学离开了教室
6 号同学离开了教室
1 号同学离开了教室
4 号同学离开了教室
3 号同学离开了教室
main执行了关门操作

二、循环栅栏 CyclicBarrier

1. 概述

CyclicBarrier 字面意思是环栅栏,是 JUC 下的一个并发工具,跟 CountDownLatch 很相似,都可以使线程先等待然后再执行,但是它的功能比 CountDownLatch 更加复杂和强大, CountDownLatch 是一个或者多个线程等待另外一批线程执行完毕后,在接着执行,而 CyclicBarrier 是等待一批线程到达某个状态之后再同时开始执行,回环的意思是当所有的线程被释放后,CyclicBarrier 可以被重启,也就是可以重复使用。

CyclicBarrier

常用的构造方法有:CyclicBarrier(int parties,Runnable barrierAction) ,其底层代码如下

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    // 最后需要执行的方法
    this.barrierCommand = barrierAction;
}

创建一个新的CyclicBarrier,它将在 给定数量的参与者 (线程)处于等待状态时启动,并在启动barrier时执行 给定的屏障操作,该操作由最后一个进入barrier的线程操作

常用的方法有:

await() 在所有的参与者都已经在此barrier上调用await方法之前一直等待。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    // 同步锁
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
		// 每次执行 CyclicBarrier 一次障碍数会加一,距离目标障碍数-1
        int index = --count;
        // 当达到目标障碍数执行if内代码
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    // 执行Runnable代码
                    command.run();
                ranAction = true;
                nextGeneration();
                // 返回0
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 循环,直到触发、中断、中断或超时  
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

从底层代码可知, CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会+1,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。可以将 CyclicBarrier 理解为+1 操作(指与目标障碍数的距离)

2. 案例演示

代码案例:集齐7颗龙珠就可以召唤神龙

public class CyclicBarrirtTest {
    private static Integer NUMBER = 7;

    public static void main(String[] args) {
        // 每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。
        CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> System.out.println("集齐7颗龙珠,可以召唤神龙了"));

        for (int i = 1; i <= 7; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 号龙珠已收集");
                try {
                    // 执行 await() 方法,障碍数 +1
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }
            }, String.valueOf(i)).start();
        }
    }
}

执行结果

5 号龙珠已收集
1 号龙珠已收集
4 号龙珠已收集
2 号龙珠已收集
6 号龙珠已收集
3 号龙珠已收集
7 号龙珠已收集
集齐7颗龙珠,可以召唤神龙了

三、信号灯 Semaphore

1. 概述

信号量(Semaphore),又被称为信号灯,在多线程环境下用于协调各个线程, 以保证它们能够正确、合理的使用公共资源。信号量维护了一个许可集,我们在初始化Semaphore时需要为这个许可集传入一个数量值,该数量值代表同一时间能访问共享资源的线程数量。

线程可以通过acquire()方法获取到一个许可,然后对共享资源进行操作。注意如果许可集已分配完了,那么线程将进入等待状态,直到其他线程释放许可才有机会再获取许可,线程释放一个许可通过release()方法完成,”许可”将被归还给Semaphore。

具体常用的构造方法有:

Semaphore(int permits) 创建具有给定的许可数和非公平的公平设置的 Semapore

public Semaphore(int permits) {
    // 默认创建非公平锁
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    // fair为true时,为公平锁
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

具体常用的方法有:

acquire()从此信号量获取一个许可,在获取到许可前一直将线程阻塞。

release()释放一个许可,将其返回给信号量

public void acquire() throws InterruptedException {
    // 阻塞当前线程
    sync.acquireSharedInterruptibly(1);
}
public void release() {
    // 释放一个许可
    sync.releaseShared(1);
}

设置许可数量 Semaphore semaphore = new Semaphore(3);

一般 acquire()都会抛出异常,release 在 finally 中执行

2. 案例演示

6辆汽车,停3个车位

public class SemaphoreTest {
    public static void main(String[] args) {
        //创建Semaphore,设置许可数量
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                try {
                    // 获取许可,在获取到前,线程阻塞
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " 找到了车位");
                    // 设置停车时间
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    // 离开车位
                    System.out.println(Thread.currentThread().getName()+"------离开了车位");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    // 释放许可
                    semaphore.release();
                }

            }, "车辆" + i).start();
        }
    }
}

执行结果:

车辆1 找到了车位
车辆3 找到了车位
车辆2 找到了车位
车辆1------离开了车位
车辆4 找到了车位
车辆2------离开了车位
车辆5 找到了车位
车辆5------离开了车位
车辆6 找到了车位
车辆3------离开了车位
车辆6------离开了车位
车辆4------离开了车位

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com