博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CountDownLatch与CyclicBarrier
阅读量:6039 次
发布时间:2019-06-20

本文共 9110 字,大约阅读时间需要 30 分钟。

对于AbstractQueuedSynchronizer衍生出来的并发工具类,这一篇再介绍俩。

场景1:有4个大文件的数据需要统计,最终将所有的统计结果进行加工,得到最后的分析数据。为了加速处理过程,当然是利用多线程:开启4个线程去分别统计每个文件的数据,开启1个线程对之前4个线程的数据加工。

这里的难点是加工线程必须等到4个统计线程都结束了,才能开始工作,不然加工出来的数据肯定是脏的。

如何保证这一点呢,synchronized、wait-notify、Condition这些机制实现起来好像特别麻烦,需要自己实现很多逻辑的控制。正是基于这个痛点,并发大师提供了一个工具类---CountDownLatch。

先来看看如何使用:

 

package countdownlatch;import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {    public static void main(String[] args) {        CountDownLatch latch = new CountDownLatch(4);                new Thread() {            @Override            public void run() {                System.out.println("加工线程:" + Thread.currentThread().getName() + "开始等待数据");                try {                    latch.await();                } catch (InterruptedException e) {                    e.printStackTrace();                }                System.out.println("数据都到齐了!!!");            }        }.start();                for (int i=0; i<4; i++) {            new Thread() {                @Override                public void run() {                    System.out.println(Thread.currentThread().getName() + "开始统计数据");                    try {                        sleep(1000);  // 模拟统计过程                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println(Thread.currentThread().getName() + "统计完成");                    latch.countDown();                }            }.start();        }    }}

 

运行结果

 
View Code

通过CountDownLatch的await和countDown方法轻松的实现了控制逻辑,从源码进去看看,发现await方法的执行逻辑跟上一篇的Semaphore的逻辑几乎一模一样,只不过tryAcquireShared方法的逻辑实现不一样

protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }

简单吧。CountDownLatch里面的内部类把从AbstractQueuedSynchronizer继承来的state属性,当作需等待的线程数量,tryAcquireShared方法只是判断这个数量是否到0了。如果没有到0,返回-1,后续会执行AbstractQueuedSynchronizer的doAcquireSharedInterruptibly方法,将当前线程封装成共享模式的节点,添加到等待队列。具体内容不再赘述。

再看看countDown方法的主要逻辑:

 

protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                if (c == 0)  // c为0说明已经release了,比如说4个已经线程完事,第5个线程又调用了countDown方法,不会产生任何影响                    return false;                int nextc = c-1;  // 只是减1,参数releases没起作用                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }

 

又见无限循环里面的CAS操作。每个被等待的线程调用一次countDown方法,state减1,最后一次调用的时候,nextc == 0为true,就会执行AbstractQueuedSynchronizer的doReleaseShared方法

private void doReleaseShared() {        /*         * Ensure that a release propagates, even if there are other         * in-progress acquires/releases.  This proceeds in the usual         * way of trying to unparkSuccessor of head if it needs         * signal. But if it does not, status is set to PROPAGATE to         * ensure that upon release, propagation continues.         * Additionally, we must loop in case a new node is added         * while we are doing this. Also, unlike other uses of         * unparkSuccessor, we need to know if CAS to reset status         * fails, if so rechecking.         */        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

虽然实现逻辑略显复杂,一句话概括就是,将等待队列里的等待线程都unpark,之前的例子就是将加工线程唤醒。

好了,CountDownLatch就到这里。

 

场景2:有4个大文件的数据需要处理,每个文件处理过程分2步,1)先检查文件数据是否正确,2)然后统计这个文件的某个指标;但是只要有一个文件检查没有完成(比如数据有错),所有的统计就失去意义。

思路:开启4个线程分别对应每个文件,每个线程执行完步骤1,不能马上执行步骤2,必须等待其他的3个线程都执行完步骤1,然后4个线程才能进行步骤2。

所以这里的难点是如何保证一个线程执行到某一点(步骤1完成),必须等待其他的线程也执行到这个点。

乍一看,好像是一个简单的线程通信问题。但是wait-notify、Condition的唤醒机制是一个线程唤醒另一个(或多个)线程,而这里是互相牵制,一个线程是不知道能不能进行唤醒操作的,因为有别的线程还没有执行完步骤1。

问题变得既抽象又复杂,但是并发大师有完美的解决方案---CyclicBarrier。

看例子

 

package cyclicbarrier;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest {    public static void main(String[] args) {        CyclicBarrier barrier = new CyclicBarrier(4);                for (int i=0; i<4; i++) {            new Thread() {                @Override                public void run() {                    System.out.println(Thread.currentThread().getName() + "开始检查文件");                    try {                        sleep(2000);  // 模拟检查过程                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println(Thread.currentThread().getName() + "检查文件结束,等待其他线程");                    try {                        barrier.await();  // 等待                    } catch (InterruptedException | BrokenBarrierException e) {                        e.printStackTrace();                    }                    System.out.println(Thread.currentThread().getName() + "统计");                }            }.start();        }    }}

 

运行结果

 
View Code

规整有序结果,说明线程之间的控制很到位,但是我们只是调用了一个await方法,那实现的代码必定复杂,硬着头皮看看

await方法的核心逻辑都在dowait方法里面:

/**     * Main barrier code, covering the various policies.     */    private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        final ReentrantLock lock = this.lock;        lock.lock();        try {            final Generation g = generation;            if (g.broken)                throw new BrokenBarrierException();            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            int index = --count;            if (index == 0) {  // tripped                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    if (command != null)                        command.run();                    ranAction = true;                    nextGeneration();                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }            // loop until tripped, broken, interrupted, or timed out            for (;;) {                try {                    if (!timed)                        trip.await();                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                    }                }                if (g.broken)                    throw new BrokenBarrierException();                if (g != generation)                    return index;                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            lock.unlock();        }    }

这里的实现还是借助了ReentrantLock和Condition,看看CyclicBarrier里面的属性

/** The lock for guarding barrier entry */    private final ReentrantLock lock = new ReentrantLock();    /** Condition to wait on until tripped */    private final Condition trip = lock.newCondition();    /** The number of parties */    private final int parties;    /* The command to run when tripped */    private final Runnable barrierCommand;    /** The current generation */    private Generation generation = new Generation();    /**     * Number of parties still waiting. Counts down from parties to 0     * on each generation.  It is reset to parties on each new     * generation or when broken.     */    private int count;

parties是刚开始互相等待的线程数;count是还在让别的线程等待的线程数;barrierCommand是所有线程都执行到指定位置之后马上执行的任务,就像场景1中最后的加工任务;generation标识分代信息。举个例子,左轮手枪可以装6发子弹,某一时刻,还剩2发;parties就是6,count就是2;当6发子弹全部打出,如果要执行一个动作(比如维修、清洗),这个动作就是barrierCommand;然后需要重新装入子弹,这就是一个换代的过程。

CyclicBarrier引入分代的概念就是想重复利用,Cyclic就是可循环的意思。

再回到dowait方法,参数timed和nanos代表是否考虑超时的问题,大致梳理一下执行流程:

1、先加锁,同一时刻,只有一个线程可以执行后续逻辑。

2、count减1后,如果为0,说明这个线程就是最后一个被等待的线程,就可以执行barrierCommand,然后执行更新换代:先唤醒所有的还在等待的线程,然后将parties、count、generation统统更新;

3、如果不为0,线程自己也会被加入Condition的等待队列

4、时刻要考虑超时问题、中断异常处理、换代的意外等

 

最后比较一下CountDownLatch和CyclicBarrier:

1、从场景1和场景2来看,这两个工具类的关注点不一样,CountDownLatch关注的是某一类线程等待另一类线程的信号(执行countDown方法),而CyclicBarrier关注的是同一类线程互相等待彼此的信号(执行await方法)

2、CyclicBarrier可以重复使用,而CountDownLatch只能使用一次

 

from: https://www.cnblogs.com/cz123/p/7503545.html

你可能感兴趣的文章
实现锁的多种方式和锁的高级用法
查看>>
C语言-一个fopen函数中未使用二进制模式(b)引发的血案
查看>>
Codeforces Round #260 (Div. 1) C. Civilization 并查集,直径
查看>>
Objective-C中常用的结构体NSRange,NSPoint,NSSize(CGSize),NSRect
查看>>
关于spark standalone模式下的executor问题
查看>>
TC SRM 664 div2 B BearPlaysDiv2 bfs
查看>>
Retrofit全攻略——基础篇
查看>>
代理模式
查看>>
具体问题具体分析
查看>>
【SqlServer系列】表达式(expression)
查看>>
maven与gradle的对比
查看>>
异常备忘:java.lang.UnsupportedClassVersionError: Bad version number in .class file
查看>>
uasy-datetimebox的使用
查看>>
Android Home键监听
查看>>
Java JVM虚拟机选项Xms/Xmx/PermSize/MaxPermSize(转)
查看>>
linux convert命令安装及使用
查看>>
JavaWeb(一)Servlet中乱码解决与转发和重定向的区别
查看>>
laravel5.5 Syntax error or access violation: 1071 Specified key was too long
查看>>
分布式锁与实现(一)——基于Redis实现
查看>>
RDLC报表显示图片
查看>>